diff --git a/internal_filesystem/builtin/apps/com.micropythonos.appstore/assets/appstore.py b/internal_filesystem/builtin/apps/com.micropythonos.appstore/assets/appstore.py index fd6e38e4..d02a53e9 100644 --- a/internal_filesystem/builtin/apps/com.micropythonos.appstore/assets/appstore.py +++ b/internal_filesystem/builtin/apps/com.micropythonos.appstore/assets/appstore.py @@ -1,4 +1,3 @@ -import aiohttp import lvgl as lv import json import requests @@ -7,7 +6,7 @@ import os from mpos.apps import Activity, Intent from mpos.app import App -from mpos import TaskManager +from mpos import TaskManager, DownloadManager import mpos.ui from mpos.content.package_manager import PackageManager @@ -28,7 +27,6 @@ class AppStore(Activity): app_index_url_badgehub = _BADGEHUB_API_BASE_URL + "/" + _BADGEHUB_LIST app_detail_url_badgehub = _BADGEHUB_API_BASE_URL + "/" + _BADGEHUB_DETAILS can_check_network = True - aiohttp_session = None # one session for the whole app is more performant # Widgets: main_screen = None @@ -39,7 +37,6 @@ class AppStore(Activity): progress_bar = None def onCreate(self): - self.aiohttp_session = aiohttp.ClientSession() self.main_screen = lv.obj() self.please_wait_label = lv.label(self.main_screen) self.please_wait_label.set_text("Downloading app index...") @@ -62,11 +59,8 @@ class AppStore(Activity): else: TaskManager.create_task(self.download_app_index(self.app_index_url_github)) - def onDestroy(self, screen): - await self.aiohttp_session.close() - async def download_app_index(self, json_url): - response = await self.download_url(json_url) + response = await DownloadManager.download_url(json_url) if not response: self.please_wait_label.set_text(f"Could not download app index from\n{json_url}") return @@ -152,7 +146,7 @@ class AppStore(Activity): break if not app.icon_data: try: - app.icon_data = await TaskManager.wait_for(self.download_url(app.icon_url), 5) # max 5 seconds per icon + app.icon_data = await TaskManager.wait_for(DownloadManager.download_url(app.icon_url), 5) # max 5 seconds per icon except Exception as e: print(f"Download of {app.icon_url} got exception: {e}") continue @@ -177,96 +171,6 @@ class AppStore(Activity): intent.putExtra("appstore", self) self.startActivity(intent) - ''' - This async download function can be used in 3 ways: - - with just a url => returns the content - - with a url and an outfile => writes the content to the outfile - - with a url and a chunk_callback => calls the chunk_callback(chunk_data) for each chunk - - Optionally: - - progress_callback is called with the % (0-100) progress - - if total_size is not provided, it will be taken from the response headers (if present) or default to 100KB - - a dict of headers can be passed, for example: headers['Range'] = f'bytes={self.bytes_written_so_far}-' - - Can return either: - - the actual content - - None: if the content failed to download - - True: if the URL was successfully downloaded (and written to outfile, if provided) - - False: if the URL was not successfully download and written to outfile - ''' - async def download_url(self, url, outfile=None, total_size=None, progress_callback=None, chunk_callback=None, headers=None): - print(f"Downloading {url}") - #await TaskManager.sleep(4) # test slowness - try: - async with self.aiohttp_session.get(url, headers=headers) as response: - if response.status < 200 or response.status >= 400: - return False if outfile else None - - # Figure out total size - print("headers:") ; print(response.headers) - if total_size is None: - total_size = response.headers.get('Content-Length') # some servers don't send this in the headers - if total_size is None: - print("WARNING: Unable to determine total_size from server's reply and function arguments, assuming 100KB") - total_size = 100 * 1024 - - fd = None - if outfile: - fd = open(outfile, 'wb') - if not fd: - print("WARNING: could not open {outfile} for writing!") - return False - chunks = [] - partial_size = 0 - chunk_size = 1024 - - print(f"download_url {'writing to ' + outfile if outfile else 'downloading'} {total_size} bytes in chunks of size {chunk_size}") - - while True: - tries_left = 3 - chunk_data = None - while tries_left > 0: - try: - chunk_data = await TaskManager.wait_for(response.content.read(chunk_size), 10) - break - except Exception as e: - print(f"Waiting for response.content.read of next chunk_data got error: {e}") - tries_left -= 1 - - if tries_left == 0: - print("ERROR: failed to download chunk_data, even with retries!") - if fd: - fd.close() - return False if outfile else None - - if chunk_data: - # Output - if fd: - fd.write(chunk_data) - elif chunk_callback: - await chunk_callback(chunk_data) - else: - chunks.append(chunk_data) - # Report progress - partial_size += len(chunk_data) - progress_pct = round((partial_size * 100) / int(total_size)) - print(f"progress: {partial_size} / {total_size} bytes = {progress_pct}%") - if progress_callback: - await progress_callback(progress_pct) - #await TaskManager.sleep(1) # test slowness - else: - print("chunk_data is None while there was no error so this was the last one.\n Finished downloading {url}") - if fd: - fd.close() - return True - elif chunk_callback: - return True - else: - return b''.join(chunks) - except Exception as e: - print(f"download_url got exception {e}") - return False if outfile else None - @staticmethod def badgehub_app_to_mpos_app(bhapp): #print(f"Converting {bhapp} to MPOS app object...") @@ -293,7 +197,7 @@ class AppStore(Activity): async def fetch_badgehub_app_details(self, app_obj): details_url = self.app_detail_url_badgehub + "/" + app_obj.fullname - response = await self.download_url(details_url) + response = await DownloadManager.download_url(details_url) if not response: print(f"Could not download app details from from\n{details_url}") return @@ -578,7 +482,7 @@ class AppDetail(Activity): pass temp_zip_path = "tmp/temp.mpk" print(f"Downloading .mpk file from: {zip_url} to {temp_zip_path}") - result = await self.appstore.download_url(zip_url, outfile=temp_zip_path, total_size=download_url_size, progress_callback=self.pcb) + result = await DownloadManager.download_url(zip_url, outfile=temp_zip_path, total_size=download_url_size, progress_callback=self.pcb) if result is not True: print("Download failed...") # Would be good to show an error to the user if this failed... else: diff --git a/internal_filesystem/lib/mpos/__init__.py b/internal_filesystem/lib/mpos/__init__.py index 464207b1..0746708d 100644 --- a/internal_filesystem/lib/mpos/__init__.py +++ b/internal_filesystem/lib/mpos/__init__.py @@ -2,6 +2,7 @@ from .app.app import App from .app.activity import Activity from .net.connectivity_manager import ConnectivityManager +from .net import download_manager as DownloadManager from .content.intent import Intent from .activity_navigator import ActivityNavigator from .content.package_manager import PackageManager @@ -13,7 +14,7 @@ from .app.activities.view import ViewActivity from .app.activities.share import ShareActivity __all__ = [ - "App", "Activity", "ConnectivityManager", "Intent", - "ActivityNavigator", "PackageManager", + "App", "Activity", "ConnectivityManager", "DownloadManager", "Intent", + "ActivityNavigator", "PackageManager", "TaskManager", "ChooserActivity", "ViewActivity", "ShareActivity" ] diff --git a/internal_filesystem/lib/mpos/net/__init__.py b/internal_filesystem/lib/mpos/net/__init__.py index 0cc7f355..1af8d8e5 100644 --- a/internal_filesystem/lib/mpos/net/__init__.py +++ b/internal_filesystem/lib/mpos/net/__init__.py @@ -1 +1,3 @@ # mpos.net module - Networking utilities for MicroPythonOS + +from . import download_manager diff --git a/internal_filesystem/lib/mpos/net/download_manager.py b/internal_filesystem/lib/mpos/net/download_manager.py new file mode 100644 index 00000000..0f65e768 --- /dev/null +++ b/internal_filesystem/lib/mpos/net/download_manager.py @@ -0,0 +1,352 @@ +""" +download_manager.py - Centralized download management for MicroPythonOS + +Provides async HTTP download with flexible output modes: +- Download to memory (returns bytes) +- Download to file (returns bool) +- Streaming with chunk callback (returns bool) + +Features: +- Shared aiohttp.ClientSession for performance +- Automatic session lifecycle management +- Thread-safe session access +- Retry logic (3 attempts per chunk, 10s timeout) +- Progress tracking +- Resume support via Range headers + +Example: + from mpos import DownloadManager + + # Download to memory + data = await DownloadManager.download_url("https://api.example.com/data.json") + + # Download to file with progress + async def progress(pct): + print(f"{pct}%") + + success = await DownloadManager.download_url( + "https://example.com/file.bin", + outfile="/sdcard/file.bin", + progress_callback=progress + ) + + # Stream processing + async def process_chunk(chunk): + # Process each chunk as it arrives + pass + + success = await DownloadManager.download_url( + "https://example.com/stream", + chunk_callback=process_chunk + ) +""" + +# Constants +_DEFAULT_CHUNK_SIZE = 1024 # 1KB chunks +_DEFAULT_TOTAL_SIZE = 100 * 1024 # 100KB default if Content-Length missing +_MAX_RETRIES = 3 # Retry attempts per chunk +_CHUNK_TIMEOUT_SECONDS = 10 # Timeout per chunk read + +# Module-level state (singleton pattern) +_session = None +_session_lock = None +_session_refcount = 0 + + +def _init(): + """Initialize DownloadManager (called automatically on first use).""" + global _session_lock + + if _session_lock is not None: + return # Already initialized + + try: + import _thread + _session_lock = _thread.allocate_lock() + print("DownloadManager: Initialized with thread safety") + except ImportError: + # Desktop mode without threading support (or MicroPython without _thread) + _session_lock = None + print("DownloadManager: Initialized without thread safety") + + +def _get_session(): + """Get or create the shared aiohttp session (thread-safe). + + Returns: + aiohttp.ClientSession or None: The session instance, or None if aiohttp unavailable + """ + global _session, _session_lock + + # Lazy init lock + if _session_lock is None: + _init() + + # Thread-safe session creation + if _session_lock: + _session_lock.acquire() + + try: + if _session is None: + try: + import aiohttp + _session = aiohttp.ClientSession() + print("DownloadManager: Created new aiohttp session") + except ImportError: + print("DownloadManager: aiohttp not available") + return None + return _session + finally: + if _session_lock: + _session_lock.release() + + +async def _close_session_if_idle(): + """Close session if no downloads are active (thread-safe). + + Note: MicroPythonOS aiohttp implementation doesn't require explicit session closing. + Sessions are automatically closed via "Connection: close" header. + This function is kept for potential future enhancements. + """ + global _session, _session_refcount, _session_lock + + if _session_lock: + _session_lock.acquire() + + try: + if _session and _session_refcount == 0: + # MicroPythonOS aiohttp doesn't have close() method + # Sessions close automatically, so just clear the reference + _session = None + print("DownloadManager: Cleared idle session reference") + finally: + if _session_lock: + _session_lock.release() + + +def is_session_active(): + """Check if a session is currently active. + + Returns: + bool: True if session exists and is open + """ + global _session, _session_lock + + if _session_lock: + _session_lock.acquire() + + try: + return _session is not None + finally: + if _session_lock: + _session_lock.release() + + +async def close_session(): + """Explicitly close the session (optional, normally auto-managed). + + Useful for testing or forced cleanup. Session will be recreated + on next download_url() call. + + Note: MicroPythonOS aiohttp implementation doesn't require explicit session closing. + Sessions are automatically closed via "Connection: close" header. + This function clears the session reference to allow garbage collection. + """ + global _session, _session_lock + + if _session_lock: + _session_lock.acquire() + + try: + if _session: + # MicroPythonOS aiohttp doesn't have close() method + # Just clear the reference to allow garbage collection + _session = None + print("DownloadManager: Explicitly cleared session reference") + finally: + if _session_lock: + _session_lock.release() + + +async def download_url(url, outfile=None, total_size=None, + progress_callback=None, chunk_callback=None, headers=None): + """Download a URL with flexible output modes. + + This async download function can be used in 3 ways: + - with just a url => returns the content + - with a url and an outfile => writes the content to the outfile + - with a url and a chunk_callback => calls the chunk_callback(chunk_data) for each chunk + + Args: + url (str): URL to download + outfile (str, optional): Path to write file. If None, returns bytes. + total_size (int, optional): Expected size in bytes for progress tracking. + If None, uses Content-Length header or defaults to 100KB. + progress_callback (coroutine, optional): async def callback(percent: int) + Called with progress 0-100. + chunk_callback (coroutine, optional): async def callback(chunk: bytes) + Called for each chunk. Cannot use with outfile. + headers (dict, optional): HTTP headers (e.g., {'Range': 'bytes=1000-'}) + + Returns: + bytes: Downloaded content (if outfile and chunk_callback are None) + bool: True if successful, False if failed (when using outfile or chunk_callback) + + Raises: + ValueError: If both outfile and chunk_callback are provided + + Example: + # Download to memory + data = await DownloadManager.download_url("https://example.com/file.json") + + # Download to file with progress + async def on_progress(percent): + print(f"Progress: {percent}%") + + success = await DownloadManager.download_url( + "https://example.com/large.bin", + outfile="/sdcard/large.bin", + progress_callback=on_progress + ) + + # Stream processing + async def on_chunk(chunk): + process(chunk) + + success = await DownloadManager.download_url( + "https://example.com/stream", + chunk_callback=on_chunk + ) + """ + # Validate parameters + if outfile and chunk_callback: + raise ValueError( + "Cannot use both outfile and chunk_callback. " + "Use outfile for saving to disk, or chunk_callback for streaming." + ) + + # Lazy init + if _session_lock is None: + _init() + + # Get/create session + session = _get_session() + if session is None: + print("DownloadManager: Cannot download, aiohttp not available") + return False if (outfile or chunk_callback) else None + + # Increment refcount + global _session_refcount + if _session_lock: + _session_lock.acquire() + _session_refcount += 1 + if _session_lock: + _session_lock.release() + + print(f"DownloadManager: Downloading {url}") + + fd = None + try: + # Ensure headers is a dict (aiohttp expects dict, not None) + if headers is None: + headers = {} + + async with session.get(url, headers=headers) as response: + if response.status < 200 or response.status >= 400: + print(f"DownloadManager: HTTP error {response.status}") + return False if (outfile or chunk_callback) else None + + # Figure out total size + print("DownloadManager: Response headers:", response.headers) + if total_size is None: + # response.headers is a dict (after parsing) or None/list (before parsing) + try: + if isinstance(response.headers, dict): + content_length = response.headers.get('Content-Length') + if content_length: + total_size = int(content_length) + except (AttributeError, TypeError, ValueError) as e: + print(f"DownloadManager: Could not parse Content-Length: {e}") + + if total_size is None: + print(f"DownloadManager: WARNING: Unable to determine total_size, assuming {_DEFAULT_TOTAL_SIZE} bytes") + total_size = _DEFAULT_TOTAL_SIZE + + # Setup output + if outfile: + fd = open(outfile, 'wb') + if not fd: + print(f"DownloadManager: WARNING: could not open {outfile} for writing!") + return False + + chunks = [] + partial_size = 0 + chunk_size = _DEFAULT_CHUNK_SIZE + + print(f"DownloadManager: {'Writing to ' + outfile if outfile else 'Downloading'} {total_size} bytes in chunks of size {chunk_size}") + + # Download loop with retry logic + while True: + tries_left = _MAX_RETRIES + chunk_data = None + while tries_left > 0: + try: + # Import TaskManager here to avoid circular imports + from mpos import TaskManager + chunk_data = await TaskManager.wait_for( + response.content.read(chunk_size), + _CHUNK_TIMEOUT_SECONDS + ) + break + except Exception as e: + print(f"DownloadManager: Chunk read error: {e}") + tries_left -= 1 + + if tries_left == 0: + print("DownloadManager: ERROR: failed to download chunk after retries!") + if fd: + fd.close() + return False if (outfile or chunk_callback) else None + + if chunk_data: + # Output chunk + if fd: + fd.write(chunk_data) + elif chunk_callback: + await chunk_callback(chunk_data) + else: + chunks.append(chunk_data) + + # Report progress + partial_size += len(chunk_data) + progress_pct = round((partial_size * 100) / int(total_size)) + print(f"DownloadManager: Progress: {partial_size} / {total_size} bytes = {progress_pct}%") + if progress_callback: + await progress_callback(progress_pct) + else: + # Chunk is None, download complete + print(f"DownloadManager: Finished downloading {url}") + if fd: + fd.close() + fd = None + return True + elif chunk_callback: + return True + else: + return b''.join(chunks) + + except Exception as e: + print(f"DownloadManager: Exception during download: {e}") + if fd: + fd.close() + return False if (outfile or chunk_callback) else None + finally: + # Decrement refcount + if _session_lock: + _session_lock.acquire() + _session_refcount -= 1 + if _session_lock: + _session_lock.release() + + # Close session if idle + await _close_session_if_idle() diff --git a/tests/test_download_manager.py b/tests/test_download_manager.py new file mode 100644 index 00000000..0eee1410 --- /dev/null +++ b/tests/test_download_manager.py @@ -0,0 +1,417 @@ +""" +test_download_manager.py - Tests for DownloadManager module + +Tests the centralized download manager functionality including: +- Session lifecycle management +- Download modes (memory, file, streaming) +- Progress tracking +- Error handling +- Resume support with Range headers +- Concurrent downloads +""" + +import unittest +import os +import sys + +# Import the module under test +sys.path.insert(0, '../internal_filesystem/lib') +import mpos.net.download_manager as DownloadManager + + +class TestDownloadManager(unittest.TestCase): + """Test cases for DownloadManager module.""" + + def setUp(self): + """Reset module state before each test.""" + # Reset module-level state + DownloadManager._session = None + DownloadManager._session_refcount = 0 + DownloadManager._session_lock = None + + # Create temp directory for file downloads + self.temp_dir = "/tmp/test_download_manager" + try: + os.mkdir(self.temp_dir) + except OSError: + pass # Directory already exists + + def tearDown(self): + """Clean up after each test.""" + # Close any open sessions + import asyncio + if DownloadManager._session: + asyncio.run(DownloadManager.close_session()) + + # Clean up temp files + try: + import os + for file in os.listdir(self.temp_dir): + try: + os.remove(f"{self.temp_dir}/{file}") + except OSError: + pass + os.rmdir(self.temp_dir) + except OSError: + pass + + # ==================== Session Lifecycle Tests ==================== + + def test_lazy_session_creation(self): + """Test that session is created lazily on first download.""" + import asyncio + + async def run_test(): + # Verify no session exists initially + self.assertFalse(DownloadManager.is_session_active()) + + # Perform a download + data = await DownloadManager.download_url("https://httpbin.org/bytes/100") + + # Verify session was created + # Note: Session may be closed immediately after download if refcount == 0 + # So we can't reliably check is_session_active() here + self.assertIsNotNone(data) + self.assertEqual(len(data), 100) + + asyncio.run(run_test()) + + def test_session_reuse_across_downloads(self): + """Test that the same session is reused for multiple downloads.""" + import asyncio + + async def run_test(): + # Perform first download + data1 = await DownloadManager.download_url("https://httpbin.org/bytes/50") + self.assertIsNotNone(data1) + + # Perform second download + data2 = await DownloadManager.download_url("https://httpbin.org/bytes/75") + self.assertIsNotNone(data2) + + # Verify different data was downloaded + self.assertEqual(len(data1), 50) + self.assertEqual(len(data2), 75) + + asyncio.run(run_test()) + + def test_explicit_session_close(self): + """Test explicit session closure.""" + import asyncio + + async def run_test(): + # Create session by downloading + data = await DownloadManager.download_url("https://httpbin.org/bytes/10") + self.assertIsNotNone(data) + + # Explicitly close session + await DownloadManager.close_session() + + # Verify session is closed + self.assertFalse(DownloadManager.is_session_active()) + + # Verify new download recreates session + data2 = await DownloadManager.download_url("https://httpbin.org/bytes/20") + self.assertIsNotNone(data2) + self.assertEqual(len(data2), 20) + + asyncio.run(run_test()) + + # ==================== Download Mode Tests ==================== + + def test_download_to_memory(self): + """Test downloading content to memory (returns bytes).""" + import asyncio + + async def run_test(): + data = await DownloadManager.download_url("https://httpbin.org/bytes/1024") + + self.assertIsInstance(data, bytes) + self.assertEqual(len(data), 1024) + + asyncio.run(run_test()) + + def test_download_to_file(self): + """Test downloading content to file (returns True/False).""" + import asyncio + + async def run_test(): + outfile = f"{self.temp_dir}/test_download.bin" + + success = await DownloadManager.download_url( + "https://httpbin.org/bytes/2048", + outfile=outfile + ) + + self.assertTrue(success) + self.assertEqual(os.stat(outfile)[6], 2048) + + # Clean up + os.remove(outfile) + + asyncio.run(run_test()) + + def test_download_with_chunk_callback(self): + """Test streaming download with chunk callback.""" + import asyncio + + async def run_test(): + chunks_received = [] + + async def collect_chunks(chunk): + chunks_received.append(chunk) + + success = await DownloadManager.download_url( + "https://httpbin.org/bytes/512", + chunk_callback=collect_chunks + ) + + self.assertTrue(success) + self.assertTrue(len(chunks_received) > 0) + + # Verify total size matches + total_size = sum(len(chunk) for chunk in chunks_received) + self.assertEqual(total_size, 512) + + asyncio.run(run_test()) + + def test_parameter_validation_conflicting_params(self): + """Test that outfile and chunk_callback cannot both be provided.""" + import asyncio + + async def run_test(): + with self.assertRaises(ValueError) as context: + await DownloadManager.download_url( + "https://httpbin.org/bytes/100", + outfile="/tmp/test.bin", + chunk_callback=lambda chunk: None + ) + + self.assertIn("Cannot use both", str(context.exception)) + + asyncio.run(run_test()) + + # ==================== Progress Tracking Tests ==================== + + def test_progress_callback(self): + """Test that progress callback is called with percentages.""" + import asyncio + + async def run_test(): + progress_calls = [] + + async def track_progress(percent): + progress_calls.append(percent) + + data = await DownloadManager.download_url( + "https://httpbin.org/bytes/5120", # 5KB + progress_callback=track_progress + ) + + self.assertIsNotNone(data) + self.assertTrue(len(progress_calls) > 0) + + # Verify progress values are in valid range + for pct in progress_calls: + self.assertTrue(0 <= pct <= 100) + + # Verify progress generally increases (allowing for some rounding variations) + # Note: Due to chunking and rounding, progress might not be strictly increasing + self.assertTrue(progress_calls[-1] >= 90) # Should end near 100% + + asyncio.run(run_test()) + + def test_progress_with_explicit_total_size(self): + """Test progress tracking with explicitly provided total_size.""" + import asyncio + + async def run_test(): + progress_calls = [] + + async def track_progress(percent): + progress_calls.append(percent) + + data = await DownloadManager.download_url( + "https://httpbin.org/bytes/3072", # 3KB + total_size=3072, + progress_callback=track_progress + ) + + self.assertIsNotNone(data) + self.assertTrue(len(progress_calls) > 0) + + asyncio.run(run_test()) + + # ==================== Error Handling Tests ==================== + + def test_http_error_status(self): + """Test handling of HTTP error status codes.""" + import asyncio + + async def run_test(): + # Request 404 error from httpbin + data = await DownloadManager.download_url("https://httpbin.org/status/404") + + # Should return None for memory download + self.assertIsNone(data) + + asyncio.run(run_test()) + + def test_http_error_with_file_output(self): + """Test that file download returns False on HTTP error.""" + import asyncio + + async def run_test(): + outfile = f"{self.temp_dir}/error_test.bin" + + success = await DownloadManager.download_url( + "https://httpbin.org/status/500", + outfile=outfile + ) + + # Should return False for file download + self.assertFalse(success) + + # File should not be created + try: + os.stat(outfile) + self.fail("File should not exist after failed download") + except OSError: + pass # Expected - file doesn't exist + + asyncio.run(run_test()) + + def test_invalid_url(self): + """Test handling of invalid URL.""" + import asyncio + + async def run_test(): + # Invalid URL should raise exception or return None + try: + data = await DownloadManager.download_url("http://invalid-url-that-does-not-exist.local/") + # If it doesn't raise, it should return None + self.assertIsNone(data) + except Exception: + # Exception is acceptable + pass + + asyncio.run(run_test()) + + # ==================== Headers Support Tests ==================== + + def test_custom_headers(self): + """Test that custom headers are passed to the request.""" + import asyncio + + async def run_test(): + # httpbin.org/headers echoes back the headers sent + data = await DownloadManager.download_url( + "https://httpbin.org/headers", + headers={"X-Custom-Header": "TestValue"} + ) + + self.assertIsNotNone(data) + # Verify the custom header was included (httpbin echoes it back) + response_text = data.decode('utf-8') + self.assertIn("X-Custom-Header", response_text) + self.assertIn("TestValue", response_text) + + asyncio.run(run_test()) + + # ==================== Edge Cases Tests ==================== + + def test_empty_response(self): + """Test handling of empty (0-byte) downloads.""" + import asyncio + + async def run_test(): + # Download 0 bytes + data = await DownloadManager.download_url("https://httpbin.org/bytes/0") + + self.assertIsNotNone(data) + self.assertEqual(len(data), 0) + self.assertEqual(data, b'') + + asyncio.run(run_test()) + + def test_small_download(self): + """Test downloading very small files (smaller than chunk size).""" + import asyncio + + async def run_test(): + # Download 10 bytes (much smaller than 1KB chunk size) + data = await DownloadManager.download_url("https://httpbin.org/bytes/10") + + self.assertIsNotNone(data) + self.assertEqual(len(data), 10) + + asyncio.run(run_test()) + + def test_json_download(self): + """Test downloading JSON data.""" + import asyncio + import json + + async def run_test(): + data = await DownloadManager.download_url("https://httpbin.org/json") + + self.assertIsNotNone(data) + # Verify it's valid JSON + parsed = json.loads(data.decode('utf-8')) + self.assertIsInstance(parsed, dict) + + asyncio.run(run_test()) + + # ==================== File Operations Tests ==================== + + def test_file_download_creates_directory_if_needed(self): + """Test that parent directories are NOT created (caller's responsibility).""" + import asyncio + + async def run_test(): + # Try to download to non-existent directory + outfile = "/tmp/nonexistent_dir_12345/test.bin" + + try: + success = await DownloadManager.download_url( + "https://httpbin.org/bytes/100", + outfile=outfile + ) + # Should fail because directory doesn't exist + self.assertFalse(success) + except Exception: + # Exception is acceptable + pass + + asyncio.run(run_test()) + + def test_file_overwrite(self): + """Test that downloading overwrites existing files.""" + import asyncio + + async def run_test(): + outfile = f"{self.temp_dir}/overwrite_test.bin" + + # Create initial file + with open(outfile, 'wb') as f: + f.write(b'old content') + + # Download and overwrite + success = await DownloadManager.download_url( + "https://httpbin.org/bytes/100", + outfile=outfile + ) + + self.assertTrue(success) + self.assertEqual(os.stat(outfile)[6], 100) + + # Verify old content is gone + with open(outfile, 'rb') as f: + content = f.read() + self.assertNotEqual(content, b'old content') + self.assertEqual(len(content), 100) + + # Clean up + os.remove(outfile) + + asyncio.run(run_test())