From a50e3722c93d594be64c5750f3b738b96a71c2fc Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Tue, 27 Jan 2026 13:41:15 +0100 Subject: [PATCH] DownloadManager: don't share aiohttp.ClientSession It seems to cause SSL/TLS session corruption on ESP32. There is a performance impact, so maybe it should be reintroduced again later, but for now, let's keep it simple and fix this bug. --- .../lib/mpos/net/download_manager.py | 597 +++--------------- tests/test_download_manager.py | 56 +- 2 files changed, 100 insertions(+), 553 deletions(-) diff --git a/internal_filesystem/lib/mpos/net/download_manager.py b/internal_filesystem/lib/mpos/net/download_manager.py index cd0a51f5..564efd5b 100644 --- a/internal_filesystem/lib/mpos/net/download_manager.py +++ b/internal_filesystem/lib/mpos/net/download_manager.py @@ -1,74 +1,21 @@ """ -download_manager.py - Centralized download management for MicroPythonOS +download_manager.py - HTTP download service for MicroPythonOS -Provides async HTTP download with flexible output modes: +Provides async HTTP downloads with flexible output modes: - Download to memory (returns bytes) - Download to file (returns bool) - Streaming with chunk callback (returns bool) Features: -- Shared aiohttp.ClientSession for performance -- Automatic session lifecycle management -- Thread-safe session access - Retry logic (3 attempts per chunk, 10s timeout) - Progress tracking with 2-decimal precision - Download speed reporting - Resume support via Range headers - Network error detection utilities - -Class Methods: - DownloadManager.download_url(...) - Download with flexible output modes - DownloadManager.is_session_active() - Check if session is active - DownloadManager.close_session() - Explicitly close session - DownloadManager.is_network_error(exception) - Check if error is recoverable - DownloadManager.get_resume_position(outfile) - Get file size for resume - -Example: - from mpos import DownloadManager - - # Download to memory - data = await DownloadManager.download_url("https://api.example.com/data.json") - - # Download to file with progress and speed - async def on_progress(pct): - print(f"{pct:.2f}%") # e.g., "45.67%" - - async def on_speed(speed_bps): - print(f"{speed_bps / 1024:.1f} KB/s") - - success = await DownloadManager.download_url( - "https://example.com/file.bin", - outfile="/sdcard/file.bin", - progress_callback=on_progress, - speed_callback=on_speed - ) - - # Stream processing - async def process_chunk(chunk): - # Process each chunk as it arrives - pass - - success = await DownloadManager.download_url( - "https://example.com/stream", - chunk_callback=process_chunk - ) - - # Error handling with retry - try: - await DownloadManager.download_url(url, outfile="/sdcard/file.bin") - except Exception as e: - if DownloadManager.is_network_error(e): - # Wait and retry with resume - await asyncio.sleep(2) - resume_from = DownloadManager.get_resume_position("/sdcard/file.bin") - headers = {'Range': f'bytes={resume_from}-'} if resume_from > 0 else None - await DownloadManager.download_url(url, outfile="/sdcard/file.bin", headers=headers) - else: - raise # Fatal error """ # Constants -_DEFAULT_CHUNK_SIZE = 1024 # 1KB chunks +_DEFAULT_CHUNK_SIZE = 2048 # 2KB chunks _DEFAULT_TOTAL_SIZE = 100 * 1024 # 100KB default if Content-Length missing _MAX_RETRIES = 3 # Retry attempts per chunk _CHUNK_TIMEOUT_SECONDS = 10 # Timeout per chunk read @@ -76,363 +23,73 @@ _SPEED_UPDATE_INTERVAL_MS = 1000 # Update speed every 1 second class DownloadManager: - """ - Centralized HTTP download service with flexible output modes. - Implements singleton pattern for shared aiohttp session. + """Centralized HTTP download service with flexible output modes.""" - Usage: - from mpos import DownloadManager + @classmethod + def download_url(cls, url, outfile=None, total_size=None, + progress_callback=None, chunk_callback=None, headers=None, + speed_callback=None): + """Download a URL with flexible output modes (sync or async wrapper). - # Download to memory (use module-level function for cleaner API) - data = await download_url("https://api.example.com/data.json") + This method automatically detects whether it's being called from an async context + and either returns a coroutine (for await) or runs synchronously. - # Or use class methods directly - data = await DownloadManager.download_url("https://api.example.com/data.json") + Args: + url (str): URL to download (required) + outfile (str, optional): Path to write file. If None, returns bytes. + total_size (int, optional): Expected size in bytes for progress tracking. + progress_callback (coroutine, optional): async def callback(percent: float) + chunk_callback (coroutine, optional): async def callback(chunk: bytes) + headers (dict, optional): HTTP headers (e.g., {'Range': 'bytes=1000-'}) + speed_callback (coroutine, optional): async def callback(bytes_per_second: float) - # Download to file - success = await DownloadManager.download_url( - "https://example.com/file.bin", - outfile="/sdcard/file.bin" - ) - """ - - _instance = None - - def __init__(self): - """Initialize DownloadManager singleton instance.""" - if DownloadManager._instance: - return - DownloadManager._instance = self + Returns: + bytes: Downloaded content (if outfile and chunk_callback are None) + bool: True if successful (when using outfile or chunk_callback) + coroutine: If called from async context, returns awaitable - self._session = None - self._session_lock = None - self._session_refcount = 0 - - # Initialize thread safety + Raises: + ValueError: If both outfile and chunk_callback are provided + """ + # Check if we're in an async context try: - import _thread - self._session_lock = _thread.allocate_lock() - print("DownloadManager: Initialized with thread safety") + import asyncio + try: + asyncio.current_task() + # We're in an async context, return the coroutine + return cls._download_url_async(url, outfile, total_size, + progress_callback, chunk_callback, headers, + speed_callback) + except RuntimeError: + # No running event loop, run synchronously + return asyncio.run(cls._download_url_async(url, outfile, total_size, + progress_callback, chunk_callback, headers, + speed_callback)) except ImportError: - # Desktop mode without threading support - self._session_lock = None - print("DownloadManager: Initialized without thread safety") + # asyncio not available, shouldn't happen but handle gracefully + raise ImportError("asyncio module not available") @classmethod - def _get_instance(cls): - """Get or create the singleton instance (internal use).""" - if cls._instance is None: - cls._instance = cls() - return cls._instance - - def _get_session(self): - """Get or create the shared aiohttp session (thread-safe). - - Returns: - aiohttp.ClientSession or None: The session instance, or None if aiohttp unavailable - """ - # Thread-safe session creation - if self._session_lock: - self._session_lock.acquire() - - try: - if self._session is None: - try: - import aiohttp - self._session = aiohttp.ClientSession() - print("DownloadManager: Created new aiohttp session") - except ImportError: - print("DownloadManager: aiohttp not available") - return None - return self._session - finally: - if self._session_lock: - self._session_lock.release() - - async def _close_session_if_idle(self): - """Close session if no downloads are active (thread-safe). - - Note: MicroPythonOS aiohttp implementation doesn't require explicit session closing. - Sessions are automatically closed via "Connection: close" header. - This function is kept for potential future enhancements. - """ - if self._session_lock: - self._session_lock.acquire() - - try: - if self._session and self._session_refcount == 0: - # MicroPythonOS aiohttp doesn't have close() method - # Sessions close automatically, so just clear the reference - self._session = None - print("DownloadManager: Cleared idle session reference") - finally: - if self._session_lock: - self._session_lock.release() - - def _is_session_active(self): - """Check if a session is currently active (instance method). - - Returns: - bool: True if session exists and is open - """ - if self._session_lock: - self._session_lock.acquire() - - try: - return self._session is not None - finally: - if self._session_lock: - self._session_lock.release() - - async def _close_session(self): - """Explicitly close the session (instance method, optional, normally auto-managed). - - Useful for testing or forced cleanup. Session will be recreated - on next download_url() call. - - Note: MicroPythonOS aiohttp implementation doesn't require explicit session closing. - Sessions are automatically closed via "Connection: close" header. - This function clears the session reference to allow garbage collection. - """ - if self._session_lock: - self._session_lock.acquire() - - try: - if self._session: - # MicroPythonOS aiohttp doesn't have close() method - # Just clear the reference to allow garbage collection - self._session = None - print("DownloadManager: Explicitly cleared session reference") - finally: - if self._session_lock: - self._session_lock.release() - - @classmethod - def is_session_active(cls): - """Check if a session is currently active. - - Returns: - bool: True if session exists and is open - """ - instance = cls._get_instance() - return instance._is_session_active() - - @classmethod - async def close_session(cls): - """Explicitly close the session (optional, normally auto-managed). - - Useful for testing or forced cleanup. Session will be recreated - on next download_url() call. - - Note: MicroPythonOS aiohttp implementation doesn't require explicit session closing. - Sessions are automatically closed via "Connection: close" header. - This function clears the session reference to allow garbage collection. - """ - instance = cls._get_instance() - return await instance._close_session() - - @classmethod - async def download_url(cls, url, outfile=None, total_size=None, - progress_callback=None, chunk_callback=None, headers=None, - speed_callback=None): + async def _download_url_async(cls, url, outfile=None, total_size=None, + progress_callback=None, chunk_callback=None, headers=None, + speed_callback=None): """Download a URL with flexible output modes. - This async download function can be used in 3 ways: - - with just a url => returns the content - - with a url and an outfile => writes the content to the outfile - - with a url and a chunk_callback => calls the chunk_callback(chunk_data) for each chunk - Args: - url (str): URL to download + url (str): URL to download (required) outfile (str, optional): Path to write file. If None, returns bytes. total_size (int, optional): Expected size in bytes for progress tracking. - If None, uses Content-Length header or defaults to 100KB. progress_callback (coroutine, optional): async def callback(percent: float) - Called with progress 0.00-100.00 (2 decimal places). - Only called when progress changes by at least 0.01%. chunk_callback (coroutine, optional): async def callback(chunk: bytes) - Called for each chunk. Cannot use with outfile. headers (dict, optional): HTTP headers (e.g., {'Range': 'bytes=1000-'}) speed_callback (coroutine, optional): async def callback(bytes_per_second: float) - Called periodically (every ~1 second) with download speed. Returns: bytes: Downloaded content (if outfile and chunk_callback are None) bool: True if successful (when using outfile or chunk_callback) Raises: - ImportError: If aiohttp module is not available - RuntimeError: If HTTP request fails (status code < 200 or >= 400) - OSError: If chunk download times out after retries or network connection is lost ValueError: If both outfile and chunk_callback are provided - Exception: Other download errors (propagated from aiohttp or chunk processing) - - Example: - # Download to memory - data = await DownloadManager.download_url("https://example.com/file.json") - - # Download to file with progress and speed - async def on_progress(percent): - print(f"Progress: {percent:.2f}%") - - async def on_speed(bps): - print(f"Speed: {bps / 1024:.1f} KB/s") - - success = await DownloadManager.download_url( - "https://example.com/large.bin", - outfile="/sdcard/large.bin", - progress_callback=on_progress, - speed_callback=on_speed - ) - - # Stream processing - async def on_chunk(chunk): - process(chunk) - - success = await DownloadManager.download_url( - "https://example.com/stream", - chunk_callback=on_chunk - ) - """ - instance = cls._get_instance() - return await instance._download_url( - url, outfile=outfile, total_size=total_size, - progress_callback=progress_callback, chunk_callback=chunk_callback, - headers=headers, speed_callback=speed_callback - ) - - @staticmethod - def is_network_error(exception): - """Check if exception is a recoverable network error. - - Recognizes common network error codes and messages that indicate - temporary connectivity issues that can be retried. - - Args: - exception: Exception to check - - Returns: - bool: True if this is a network error that can be retried - - Example: - try: - await DownloadManager.download_url(url) - except Exception as e: - if DownloadManager.is_network_error(e): - # Retry or pause - await asyncio.sleep(2) - # retry... - else: - # Fatal error - raise - """ - error_str = str(exception).lower() - error_repr = repr(exception).lower() - - # Common network error codes and messages - # -113 = ECONNABORTED (connection aborted) - actually 103 - # -104 = ECONNRESET (connection reset by peer) - correct - # -110 = ETIMEDOUT (connection timed out) - correct - # -118 = EHOSTUNREACH (no route to host) - actually 113 - # -202 = DNS/connection error (network not ready) - # - # See lvgl_micropython/lib/esp-idf/components/lwip/lwip/src/include/lwip/errno.h - network_indicators = [ - '-113', '-104', '-110', '-118', '-202', # Error codes - 'econnaborted', 'econnreset', 'etimedout', 'ehostunreach', # Error names - 'connection reset', 'connection aborted', # Error messages - 'broken pipe', 'network unreachable', 'host unreachable', - 'failed to download chunk' # From download_manager OSError(-110) - ] - - return any(indicator in error_str or indicator in error_repr - for indicator in network_indicators) - - @staticmethod - def get_resume_position(outfile): - """Get the current size of a partially downloaded file. - - Useful for implementing resume functionality with Range headers. - - Args: - outfile: Path to file - - Returns: - int: File size in bytes, or 0 if file doesn't exist - - Example: - resume_from = DownloadManager.get_resume_position("/sdcard/file.bin") - if resume_from > 0: - headers = {'Range': f'bytes={resume_from}-'} - await DownloadManager.download_url(url, outfile=outfile, headers=headers) - """ - try: - import os - return os.stat(outfile)[6] # st_size - except OSError: - return 0 - - async def _download_url(self, url, outfile=None, total_size=None, - progress_callback=None, chunk_callback=None, headers=None, - speed_callback=None): - """Download a URL with flexible output modes (instance method). - - This async download function can be used in 3 ways: - - with just a url => returns the content - - with a url and an outfile => writes the content to the outfile - - with a url and a chunk_callback => calls the chunk_callback(chunk_data) for each chunk - - Args: - url (str): URL to download - outfile (str, optional): Path to write file. If None, returns bytes. - total_size (int, optional): Expected size in bytes for progress tracking. - If None, uses Content-Length header or defaults to 100KB. - progress_callback (coroutine, optional): async def callback(percent: float) - Called with progress 0.00-100.00 (2 decimal places). - Only called when progress changes by at least 0.01%. - chunk_callback (coroutine, optional): async def callback(chunk: bytes) - Called for each chunk. Cannot use with outfile. - headers (dict, optional): HTTP headers (e.g., {'Range': 'bytes=1000-'}) - speed_callback (coroutine, optional): async def callback(bytes_per_second: float) - Called periodically (every ~1 second) with download speed. - - Returns: - bytes: Downloaded content (if outfile and chunk_callback are None) - bool: True if successful (when using outfile or chunk_callback) - - Raises: - ImportError: If aiohttp module is not available - RuntimeError: If HTTP request fails (status code < 200 or >= 400) - OSError: If chunk download times out after retries or network connection is lost - ValueError: If both outfile and chunk_callback are provided - Exception: Other download errors (propagated from aiohttp or chunk processing) - - Example: - # Download to memory - data = await DownloadManager.download_url("https://example.com/file.json") - - # Download to file with progress and speed - async def on_progress(percent): - print(f"Progress: {percent:.2f}%") - - async def on_speed(bps): - print(f"Speed: {bps / 1024:.1f} KB/s") - - success = await DownloadManager.download_url( - "https://example.com/large.bin", - outfile="/sdcard/large.bin", - progress_callback=on_progress, - speed_callback=on_speed - ) - - # Stream processing - async def on_chunk(chunk): - process(chunk) - - success = await DownloadManager.download_url( - "https://example.com/stream", - chunk_callback=on_chunk - ) """ # Validate parameters if outfile and chunk_callback: @@ -441,19 +98,15 @@ class DownloadManager: "Use outfile for saving to disk, or chunk_callback for streaming." ) - # Get/create session - session = self._get_session() - if session is None: - print("DownloadManager: Cannot download, aiohttp not available") + # Create a new session for this request + try: + import aiohttp + except ImportError: + print("DownloadManager: aiohttp not available") raise ImportError("aiohttp module not available") - # Increment refcount - if self._session_lock: - self._session_lock.acquire() - self._session_refcount += 1 - if self._session_lock: - self._session_lock.release() - + session = aiohttp.ClientSession() + print("DownloadManager: Created new aiohttp session") print(f"DownloadManager: Downloading {url}") fd = None @@ -603,115 +256,49 @@ class DownloadManager: if fd: fd.close() raise # Re-raise the exception instead of suppressing it - finally: - # Decrement refcount - if self._session_lock: - self._session_lock.acquire() - self._session_refcount -= 1 - if self._session_lock: - self._session_lock.release() - - # Close session if idle - await self._close_session_if_idle() - - -# ============================================================================ -# Smart wrapper: auto-detect async context and run synchronously if needed -# ============================================================================ - -class _DownloadManagerWrapper: - """Smart wrapper that works both sync and async. - - - If called with await in async context: returns coroutine (async) - - If called without await in sync context: runs synchronously (blocking) - - If called with await in sync context: still works (creates event loop) - """ - - def __init__(self, async_class): - """Initialize with reference to async DownloadManager class.""" - self._async_class = async_class - - def download_url(self, url, outfile=None, total_size=None, - progress_callback=None, chunk_callback=None, headers=None, - speed_callback=None): - """Download URL - works both sync and async. - - Async usage (in async function): - data = await DownloadManager.download_url(url) - - Sync usage (in regular function): - data = DownloadManager.download_url(url) # Blocks until complete - """ - # Get the async coroutine - coro = self._async_class.download_url( - url, outfile=outfile, total_size=total_size, - progress_callback=progress_callback, chunk_callback=chunk_callback, - headers=headers, speed_callback=speed_callback - ) - - # Try to detect if we're in an async context - try: - import asyncio - try: - # Check if there's a running task (MicroPython uses current_task()) - asyncio.current_task() - # We're in async context, return the coroutine for await - return coro - except RuntimeError: - # No running task, we're in sync context - # Create a new event loop and run the coroutine - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - return loop.run_until_complete(coro) - finally: - loop.close() - except ImportError: - # asyncio not available, just return coroutine - return coro - - async def close_session(self): - """Close session - works both sync and async. - - Async usage: - await DownloadManager.close_session() - - Sync usage: - DownloadManager.close_session() # Blocks until complete - """ - return await self._async_class.close_session() - - def is_session_active(self): - """Check if session is active (synchronous).""" - return self._async_class.is_session_active() @staticmethod def is_network_error(exception): - """Check if exception is a network error (synchronous).""" - return _original_download_manager.is_network_error(exception) + """Check if exception is a recoverable network error. + + Args: + exception: Exception to check + + Returns: + bool: True if this is a network error that can be retried + """ + error_str = str(exception).lower() + error_repr = repr(exception).lower() + + # Common network error codes and messages + network_indicators = [ + '-113', '-104', '-110', '-118', '-202', # Error codes + 'econnaborted', 'econnreset', 'etimedout', 'ehostunreach', # Error names + 'connection reset', 'connection aborted', # Error messages + 'broken pipe', 'network unreachable', 'host unreachable', + 'failed to download chunk' # From download_manager OSError(-110) + ] + + return any(indicator in error_str or indicator in error_repr + for indicator in network_indicators) - def get_resume_position(self, outfile): - """Get resume position (synchronous).""" - return self._async_class.get_resume_position(outfile) + @staticmethod + def get_resume_position(outfile): + """Get the current size of a partially downloaded file. + + Args: + outfile: Path to file + + Returns: + int: File size in bytes, or 0 if file doesn't exist + """ + try: + import os + return os.stat(outfile)[6] # st_size + except OSError: + return 0 -# ============================================================================ -# Initialize singleton instance (for internal use) -# ============================================================================ - -# Ensure singleton is initialized when module is imported -_instance = DownloadManager._get_instance() - -# Save the original async class -_original_download_manager = DownloadManager - -# Replace with smart wrapper -DownloadManager = _DownloadManagerWrapper(_original_download_manager) - -# ============================================================================ -# Module-level exports for direct import -# ============================================================================ - -# Export utility functions at module level for convenience -is_network_error = _original_download_manager.is_network_error -get_resume_position = _original_download_manager.get_resume_position +# Module-level exports for convenience +is_network_error = DownloadManager.is_network_error +get_resume_position = DownloadManager.get_resume_position diff --git a/tests/test_download_manager.py b/tests/test_download_manager.py index 12345f5c..a7014487 100644 --- a/tests/test_download_manager.py +++ b/tests/test_download_manager.py @@ -34,13 +34,6 @@ class TestDownloadManager(unittest.TestCase): def tearDown(self): """Clean up after each test.""" - # Close any open sessions - import asyncio - try: - asyncio.run(DownloadManager.close_session()) - except Exception: - pass # Session might not be open - # Clean up temp files try: import os @@ -56,13 +49,10 @@ class TestDownloadManager(unittest.TestCase): # ==================== Session Lifecycle Tests ==================== def test_lazy_session_creation(self): - """Test that session is created lazily on first download.""" + """Test that session is created for each download (per-request design).""" import asyncio async def run_test(): - # Verify no session exists initially - self.assertFalse(DownloadManager.is_session_active()) - # Perform a download try: data = await DownloadManager.download_url("https://httpbin.org/bytes/100") @@ -71,9 +61,7 @@ class TestDownloadManager(unittest.TestCase): self.skipTest(f"httpbin.org unavailable: {e}") return - # Verify session was created - # Note: Session may be closed immediately after download if refcount == 0 - # So we can't reliably check is_session_active() here + # Verify download succeeded self.assertIsNotNone(data) self.assertEqual(len(data), 100) @@ -106,35 +94,6 @@ class TestDownloadManager(unittest.TestCase): asyncio.run(run_test()) - def test_explicit_session_close(self): - """Test explicit session closure.""" - import asyncio - - async def run_test(): - # Create session by downloading - try: - data = await DownloadManager.download_url("https://httpbin.org/bytes/10") - except Exception as e: - self.skipTest(f"httpbin.org unavailable: {e}") - return - self.assertIsNotNone(data) - - # Explicitly close session - await DownloadManager.close_session() - - # Verify session is closed - self.assertFalse(DownloadManager.is_session_active()) - - # Verify new download recreates session - try: - data2 = await DownloadManager.download_url("https://httpbin.org/bytes/20") - except Exception as e: - self.skipTest(f"httpbin.org unavailable: {e}") - return - self.assertIsNotNone(data2) - self.assertEqual(len(data2), 20) - - asyncio.run(run_test()) # ==================== Download Mode Tests ==================== @@ -549,11 +508,12 @@ class TestDownloadManager(unittest.TestCase): return self.assertTrue(success) - self.assertTrue(os.path.exists(outfile)) - - # Verify file has content - file_size = os.stat(outfile)[6] - self.assertTrue(file_size > 0) + # Check file exists using os.stat instead of os.path.exists + try: + file_size = os.stat(outfile)[6] + self.assertTrue(file_size > 0) + except OSError: + self.fail("File should exist after successful download") # Verify it's HTML content with open(outfile, 'rb') as f: