CameraManager: no need for locks

This commit is contained in:
Thomas Farstrike
2026-01-14 19:31:47 +01:00
parent d1ce153ca3
commit 3a5f7ca212
5 changed files with 37 additions and 76 deletions
+1
View File
@@ -9,6 +9,7 @@
- App framework: simplify MANIFEST.JSON
- AudioFlinger framework: simplify import, use singleton class
- Create new SettingsActivity and SettingActivity framework so apps can easily add settings screens with just a few lines of code
- Create CameraManager framework so apps can easily check whether there is a camera available etc.
- Improve robustness by catching unhandled app exceptions
- Improve robustness with custom exception that does not deinit() the TaskHandler
- Improve robustness by removing TaskHandler callback that throws an uncaught exception
+2 -1
View File
@@ -10,6 +10,7 @@ from .content.intent import Intent
from .activity_navigator import ActivityNavigator
from .content.package_manager import PackageManager
from .task_manager import TaskManager
from . import camera_manager as CameraManager
# Common activities
from .app.activities.chooser import ChooserActivity
@@ -64,7 +65,7 @@ __all__ = [
"Activity",
"SharedPreferences",
"ConnectivityManager", "DownloadManager", "WifiService", "AudioFlinger", "Intent",
"ActivityNavigator", "PackageManager", "TaskManager",
"ActivityNavigator", "PackageManager", "TaskManager", "CameraManager",
# Common activities
"ChooserActivity", "ViewActivity", "ShareActivity",
"SettingActivity", "SettingsActivity", "CameraActivity",
+16 -56
View File
@@ -22,11 +22,6 @@ MIT License
Copyright (c) 2024 MicroPythonOS contributors
"""
try:
import _thread
_lock = _thread.allocate_lock()
except ImportError:
_lock = None
# Camera lens facing constants (matching Android Camera2 API)
@@ -105,22 +100,15 @@ def add_camera(camera):
print(f"[CameraManager] Error: add_camera() requires Camera object, got {type(camera)}")
return False
if _lock:
_lock.acquire()
try:
# Check if camera with same facing already exists
for existing in _cameras:
if existing.lens_facing == camera.lens_facing:
print(f"[CameraManager] Warning: Camera with facing {camera.lens_facing} already registered")
# Still add it (allow multiple cameras with same facing)
_cameras.append(camera)
print(f"[CameraManager] Registered camera: {camera}")
return True
finally:
if _lock:
_lock.release()
# Check if camera with same facing already exists
for existing in _cameras:
if existing.lens_facing == camera.lens_facing:
print(f"[CameraManager] Warning: Camera with facing {camera.lens_facing} already registered")
# Still add it (allow multiple cameras with same facing)
_cameras.append(camera)
print(f"[CameraManager] Registered camera: {camera}")
return True
def get_cameras():
@@ -129,14 +117,7 @@ def get_cameras():
Returns:
list: List of Camera objects (copy of internal list)
"""
if _lock:
_lock.acquire()
try:
return _cameras.copy() if _cameras else []
finally:
if _lock:
_lock.release()
return _cameras.copy() if _cameras else []
def get_camera_by_facing(lens_facing):
@@ -148,17 +129,10 @@ def get_camera_by_facing(lens_facing):
Returns:
Camera object or None if not found
"""
if _lock:
_lock.acquire()
try:
for camera in _cameras:
if camera.lens_facing == lens_facing:
return camera
return None
finally:
if _lock:
_lock.release()
for camera in _cameras:
if camera.lens_facing == lens_facing:
return camera
return None
def has_camera():
@@ -167,14 +141,7 @@ def has_camera():
Returns:
bool: True if at least one camera available
"""
if _lock:
_lock.acquire()
try:
return len(_cameras) > 0
finally:
if _lock:
_lock.release()
return len(_cameras) > 0
def get_camera_count():
@@ -183,14 +150,7 @@ def get_camera_count():
Returns:
int: Number of cameras
"""
if _lock:
_lock.acquire()
try:
return len(_cameras)
finally:
if _lock:
_lock.release()
return len(_cameras)
# Initialize on module load
+17 -18
View File
@@ -34,7 +34,7 @@ class TestAudioFlinger(unittest.TestCase):
# Reset volume to default before each test
AudioFlinger.set_volume(70)
AudioFlinger.init(
AudioFlinger(
i2s_pins=self.i2s_pins,
buzzer_instance=self.buzzer
)
@@ -52,21 +52,21 @@ class TestAudioFlinger(unittest.TestCase):
def test_has_i2s(self):
"""Test has_i2s() returns correct value."""
# With I2S configured
AudioFlinger.init(i2s_pins=self.i2s_pins, buzzer_instance=None)
AudioFlinger(i2s_pins=self.i2s_pins, buzzer_instance=None)
self.assertTrue(AudioFlinger.has_i2s())
# Without I2S configured
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
AudioFlinger(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertFalse(AudioFlinger.has_i2s())
def test_has_buzzer(self):
"""Test has_buzzer() returns correct value."""
# With buzzer configured
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
AudioFlinger(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertTrue(AudioFlinger.has_buzzer())
# Without buzzer configured
AudioFlinger.init(i2s_pins=self.i2s_pins, buzzer_instance=None)
AudioFlinger(i2s_pins=self.i2s_pins, buzzer_instance=None)
self.assertFalse(AudioFlinger.has_buzzer())
def test_stream_types(self):
@@ -95,7 +95,7 @@ class TestAudioFlinger(unittest.TestCase):
def test_no_hardware_rejects_playback(self):
"""Test that no hardware rejects all playback requests."""
# Re-initialize with no hardware
AudioFlinger.init(i2s_pins=None, buzzer_instance=None)
AudioFlinger(i2s_pins=None, buzzer_instance=None)
# WAV should be rejected (no I2S)
result = AudioFlinger.play_wav("test.wav")
@@ -108,7 +108,7 @@ class TestAudioFlinger(unittest.TestCase):
def test_i2s_only_rejects_rtttl(self):
"""Test that I2S-only config rejects buzzer playback."""
# Re-initialize with I2S only
AudioFlinger.init(i2s_pins=self.i2s_pins, buzzer_instance=None)
AudioFlinger(i2s_pins=self.i2s_pins, buzzer_instance=None)
# RTTTL should be rejected (no buzzer)
result = AudioFlinger.play_rtttl("Test:d=4,o=5,b=120:c")
@@ -117,7 +117,7 @@ class TestAudioFlinger(unittest.TestCase):
def test_buzzer_only_rejects_wav(self):
"""Test that buzzer-only config rejects I2S playback."""
# Re-initialize with buzzer only
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
AudioFlinger(i2s_pins=None, buzzer_instance=self.buzzer)
# WAV should be rejected (no I2S)
result = AudioFlinger.play_wav("test.wav")
@@ -142,7 +142,7 @@ class TestAudioFlinger(unittest.TestCase):
def test_volume_default_value(self):
"""Test that default volume is reasonable."""
# After init, volume should be at default (70)
AudioFlinger.init(i2s_pins=None, buzzer_instance=None)
AudioFlinger(i2s_pins=None, buzzer_instance=None)
self.assertEqual(AudioFlinger.get_volume(), 70)
@@ -162,7 +162,7 @@ class TestAudioFlingerRecording(unittest.TestCase):
af._current_recording = None
AudioFlinger.set_volume(70)
AudioFlinger.init(
AudioFlinger(
i2s_pins=self.i2s_pins_with_mic,
buzzer_instance=self.buzzer
)
@@ -173,17 +173,17 @@ class TestAudioFlingerRecording(unittest.TestCase):
def test_has_microphone_with_sd_in(self):
"""Test has_microphone() returns True when sd_in pin is configured."""
AudioFlinger.init(i2s_pins=self.i2s_pins_with_mic, buzzer_instance=None)
AudioFlinger(i2s_pins=self.i2s_pins_with_mic, buzzer_instance=None)
self.assertTrue(AudioFlinger.has_microphone())
def test_has_microphone_without_sd_in(self):
"""Test has_microphone() returns False when sd_in pin is not configured."""
AudioFlinger.init(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
AudioFlinger(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
self.assertFalse(AudioFlinger.has_microphone())
def test_has_microphone_no_i2s(self):
"""Test has_microphone() returns False when no I2S is configured."""
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
AudioFlinger(i2s_pins=None, buzzer_instance=self.buzzer)
self.assertFalse(AudioFlinger.has_microphone())
def test_is_recording_initially_false(self):
@@ -192,15 +192,14 @@ class TestAudioFlingerRecording(unittest.TestCase):
def test_record_wav_no_microphone(self):
"""Test that record_wav() fails when no microphone is configured."""
AudioFlinger.init(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
AudioFlinger(i2s_pins=self.i2s_pins_no_mic, buzzer_instance=None)
result = AudioFlinger.record_wav("test.wav")
self.assertFalse(result)
self.assertFalse(result, "record_wav() fails when no microphone is configured")
def test_record_wav_no_i2s(self):
"""Test that record_wav() fails when no I2S is configured."""
AudioFlinger.init(i2s_pins=None, buzzer_instance=self.buzzer)
AudioFlinger(i2s_pins=None, buzzer_instance=self.buzzer)
result = AudioFlinger.record_wav("test.wav")
self.assertFalse(result)
self.assertFalse(result, "record_wav() should fail when no I2S is configured")
def test_stop_with_no_recording(self):
"""Test that stop() can be called when nothing is recording."""
+1 -1
View File
@@ -1,3 +1,4 @@
import unittest
import sys
import os
@@ -295,4 +296,3 @@ class TestCameraManagerUsagePattern(unittest.TestCase):
self.assertFalse(has_camera)