Move download_url() to DownloadManager

This commit is contained in:
Thomas Farstrike
2025-12-17 12:26:02 +01:00
parent 7cdea5fe65
commit 5dd24090f4
5 changed files with 779 additions and 103 deletions
@@ -1,4 +1,3 @@
import aiohttp
import lvgl as lv
import json
import requests
@@ -7,7 +6,7 @@ import os
from mpos.apps import Activity, Intent
from mpos.app import App
from mpos import TaskManager
from mpos import TaskManager, DownloadManager
import mpos.ui
from mpos.content.package_manager import PackageManager
@@ -28,7 +27,6 @@ class AppStore(Activity):
app_index_url_badgehub = _BADGEHUB_API_BASE_URL + "/" + _BADGEHUB_LIST
app_detail_url_badgehub = _BADGEHUB_API_BASE_URL + "/" + _BADGEHUB_DETAILS
can_check_network = True
aiohttp_session = None # one session for the whole app is more performant
# Widgets:
main_screen = None
@@ -39,7 +37,6 @@ class AppStore(Activity):
progress_bar = None
def onCreate(self):
self.aiohttp_session = aiohttp.ClientSession()
self.main_screen = lv.obj()
self.please_wait_label = lv.label(self.main_screen)
self.please_wait_label.set_text("Downloading app index...")
@@ -62,11 +59,8 @@ class AppStore(Activity):
else:
TaskManager.create_task(self.download_app_index(self.app_index_url_github))
def onDestroy(self, screen):
await self.aiohttp_session.close()
async def download_app_index(self, json_url):
response = await self.download_url(json_url)
response = await DownloadManager.download_url(json_url)
if not response:
self.please_wait_label.set_text(f"Could not download app index from\n{json_url}")
return
@@ -152,7 +146,7 @@ class AppStore(Activity):
break
if not app.icon_data:
try:
app.icon_data = await TaskManager.wait_for(self.download_url(app.icon_url), 5) # max 5 seconds per icon
app.icon_data = await TaskManager.wait_for(DownloadManager.download_url(app.icon_url), 5) # max 5 seconds per icon
except Exception as e:
print(f"Download of {app.icon_url} got exception: {e}")
continue
@@ -177,96 +171,6 @@ class AppStore(Activity):
intent.putExtra("appstore", self)
self.startActivity(intent)
'''
This async download function can be used in 3 ways:
- with just a url => returns the content
- with a url and an outfile => writes the content to the outfile
- with a url and a chunk_callback => calls the chunk_callback(chunk_data) for each chunk
Optionally:
- progress_callback is called with the % (0-100) progress
- if total_size is not provided, it will be taken from the response headers (if present) or default to 100KB
- a dict of headers can be passed, for example: headers['Range'] = f'bytes={self.bytes_written_so_far}-'
Can return either:
- the actual content
- None: if the content failed to download
- True: if the URL was successfully downloaded (and written to outfile, if provided)
- False: if the URL was not successfully download and written to outfile
'''
async def download_url(self, url, outfile=None, total_size=None, progress_callback=None, chunk_callback=None, headers=None):
print(f"Downloading {url}")
#await TaskManager.sleep(4) # test slowness
try:
async with self.aiohttp_session.get(url, headers=headers) as response:
if response.status < 200 or response.status >= 400:
return False if outfile else None
# Figure out total size
print("headers:") ; print(response.headers)
if total_size is None:
total_size = response.headers.get('Content-Length') # some servers don't send this in the headers
if total_size is None:
print("WARNING: Unable to determine total_size from server's reply and function arguments, assuming 100KB")
total_size = 100 * 1024
fd = None
if outfile:
fd = open(outfile, 'wb')
if not fd:
print("WARNING: could not open {outfile} for writing!")
return False
chunks = []
partial_size = 0
chunk_size = 1024
print(f"download_url {'writing to ' + outfile if outfile else 'downloading'} {total_size} bytes in chunks of size {chunk_size}")
while True:
tries_left = 3
chunk_data = None
while tries_left > 0:
try:
chunk_data = await TaskManager.wait_for(response.content.read(chunk_size), 10)
break
except Exception as e:
print(f"Waiting for response.content.read of next chunk_data got error: {e}")
tries_left -= 1
if tries_left == 0:
print("ERROR: failed to download chunk_data, even with retries!")
if fd:
fd.close()
return False if outfile else None
if chunk_data:
# Output
if fd:
fd.write(chunk_data)
elif chunk_callback:
await chunk_callback(chunk_data)
else:
chunks.append(chunk_data)
# Report progress
partial_size += len(chunk_data)
progress_pct = round((partial_size * 100) / int(total_size))
print(f"progress: {partial_size} / {total_size} bytes = {progress_pct}%")
if progress_callback:
await progress_callback(progress_pct)
#await TaskManager.sleep(1) # test slowness
else:
print("chunk_data is None while there was no error so this was the last one.\n Finished downloading {url}")
if fd:
fd.close()
return True
elif chunk_callback:
return True
else:
return b''.join(chunks)
except Exception as e:
print(f"download_url got exception {e}")
return False if outfile else None
@staticmethod
def badgehub_app_to_mpos_app(bhapp):
#print(f"Converting {bhapp} to MPOS app object...")
@@ -293,7 +197,7 @@ class AppStore(Activity):
async def fetch_badgehub_app_details(self, app_obj):
details_url = self.app_detail_url_badgehub + "/" + app_obj.fullname
response = await self.download_url(details_url)
response = await DownloadManager.download_url(details_url)
if not response:
print(f"Could not download app details from from\n{details_url}")
return
@@ -578,7 +482,7 @@ class AppDetail(Activity):
pass
temp_zip_path = "tmp/temp.mpk"
print(f"Downloading .mpk file from: {zip_url} to {temp_zip_path}")
result = await self.appstore.download_url(zip_url, outfile=temp_zip_path, total_size=download_url_size, progress_callback=self.pcb)
result = await DownloadManager.download_url(zip_url, outfile=temp_zip_path, total_size=download_url_size, progress_callback=self.pcb)
if result is not True:
print("Download failed...") # Would be good to show an error to the user if this failed...
else:
+3 -2
View File
@@ -2,6 +2,7 @@
from .app.app import App
from .app.activity import Activity
from .net.connectivity_manager import ConnectivityManager
from .net import download_manager as DownloadManager
from .content.intent import Intent
from .activity_navigator import ActivityNavigator
from .content.package_manager import PackageManager
@@ -13,7 +14,7 @@ from .app.activities.view import ViewActivity
from .app.activities.share import ShareActivity
__all__ = [
"App", "Activity", "ConnectivityManager", "Intent",
"ActivityNavigator", "PackageManager",
"App", "Activity", "ConnectivityManager", "DownloadManager", "Intent",
"ActivityNavigator", "PackageManager", "TaskManager",
"ChooserActivity", "ViewActivity", "ShareActivity"
]
@@ -1 +1,3 @@
# mpos.net module - Networking utilities for MicroPythonOS
from . import download_manager
@@ -0,0 +1,352 @@
"""
download_manager.py - Centralized download management for MicroPythonOS
Provides async HTTP download with flexible output modes:
- Download to memory (returns bytes)
- Download to file (returns bool)
- Streaming with chunk callback (returns bool)
Features:
- Shared aiohttp.ClientSession for performance
- Automatic session lifecycle management
- Thread-safe session access
- Retry logic (3 attempts per chunk, 10s timeout)
- Progress tracking
- Resume support via Range headers
Example:
from mpos import DownloadManager
# Download to memory
data = await DownloadManager.download_url("https://api.example.com/data.json")
# Download to file with progress
async def progress(pct):
print(f"{pct}%")
success = await DownloadManager.download_url(
"https://example.com/file.bin",
outfile="/sdcard/file.bin",
progress_callback=progress
)
# Stream processing
async def process_chunk(chunk):
# Process each chunk as it arrives
pass
success = await DownloadManager.download_url(
"https://example.com/stream",
chunk_callback=process_chunk
)
"""
# Constants
_DEFAULT_CHUNK_SIZE = 1024 # 1KB chunks
_DEFAULT_TOTAL_SIZE = 100 * 1024 # 100KB default if Content-Length missing
_MAX_RETRIES = 3 # Retry attempts per chunk
_CHUNK_TIMEOUT_SECONDS = 10 # Timeout per chunk read
# Module-level state (singleton pattern)
_session = None
_session_lock = None
_session_refcount = 0
def _init():
"""Initialize DownloadManager (called automatically on first use)."""
global _session_lock
if _session_lock is not None:
return # Already initialized
try:
import _thread
_session_lock = _thread.allocate_lock()
print("DownloadManager: Initialized with thread safety")
except ImportError:
# Desktop mode without threading support (or MicroPython without _thread)
_session_lock = None
print("DownloadManager: Initialized without thread safety")
def _get_session():
"""Get or create the shared aiohttp session (thread-safe).
Returns:
aiohttp.ClientSession or None: The session instance, or None if aiohttp unavailable
"""
global _session, _session_lock
# Lazy init lock
if _session_lock is None:
_init()
# Thread-safe session creation
if _session_lock:
_session_lock.acquire()
try:
if _session is None:
try:
import aiohttp
_session = aiohttp.ClientSession()
print("DownloadManager: Created new aiohttp session")
except ImportError:
print("DownloadManager: aiohttp not available")
return None
return _session
finally:
if _session_lock:
_session_lock.release()
async def _close_session_if_idle():
"""Close session if no downloads are active (thread-safe).
Note: MicroPythonOS aiohttp implementation doesn't require explicit session closing.
Sessions are automatically closed via "Connection: close" header.
This function is kept for potential future enhancements.
"""
global _session, _session_refcount, _session_lock
if _session_lock:
_session_lock.acquire()
try:
if _session and _session_refcount == 0:
# MicroPythonOS aiohttp doesn't have close() method
# Sessions close automatically, so just clear the reference
_session = None
print("DownloadManager: Cleared idle session reference")
finally:
if _session_lock:
_session_lock.release()
def is_session_active():
"""Check if a session is currently active.
Returns:
bool: True if session exists and is open
"""
global _session, _session_lock
if _session_lock:
_session_lock.acquire()
try:
return _session is not None
finally:
if _session_lock:
_session_lock.release()
async def close_session():
"""Explicitly close the session (optional, normally auto-managed).
Useful for testing or forced cleanup. Session will be recreated
on next download_url() call.
Note: MicroPythonOS aiohttp implementation doesn't require explicit session closing.
Sessions are automatically closed via "Connection: close" header.
This function clears the session reference to allow garbage collection.
"""
global _session, _session_lock
if _session_lock:
_session_lock.acquire()
try:
if _session:
# MicroPythonOS aiohttp doesn't have close() method
# Just clear the reference to allow garbage collection
_session = None
print("DownloadManager: Explicitly cleared session reference")
finally:
if _session_lock:
_session_lock.release()
async def download_url(url, outfile=None, total_size=None,
progress_callback=None, chunk_callback=None, headers=None):
"""Download a URL with flexible output modes.
This async download function can be used in 3 ways:
- with just a url => returns the content
- with a url and an outfile => writes the content to the outfile
- with a url and a chunk_callback => calls the chunk_callback(chunk_data) for each chunk
Args:
url (str): URL to download
outfile (str, optional): Path to write file. If None, returns bytes.
total_size (int, optional): Expected size in bytes for progress tracking.
If None, uses Content-Length header or defaults to 100KB.
progress_callback (coroutine, optional): async def callback(percent: int)
Called with progress 0-100.
chunk_callback (coroutine, optional): async def callback(chunk: bytes)
Called for each chunk. Cannot use with outfile.
headers (dict, optional): HTTP headers (e.g., {'Range': 'bytes=1000-'})
Returns:
bytes: Downloaded content (if outfile and chunk_callback are None)
bool: True if successful, False if failed (when using outfile or chunk_callback)
Raises:
ValueError: If both outfile and chunk_callback are provided
Example:
# Download to memory
data = await DownloadManager.download_url("https://example.com/file.json")
# Download to file with progress
async def on_progress(percent):
print(f"Progress: {percent}%")
success = await DownloadManager.download_url(
"https://example.com/large.bin",
outfile="/sdcard/large.bin",
progress_callback=on_progress
)
# Stream processing
async def on_chunk(chunk):
process(chunk)
success = await DownloadManager.download_url(
"https://example.com/stream",
chunk_callback=on_chunk
)
"""
# Validate parameters
if outfile and chunk_callback:
raise ValueError(
"Cannot use both outfile and chunk_callback. "
"Use outfile for saving to disk, or chunk_callback for streaming."
)
# Lazy init
if _session_lock is None:
_init()
# Get/create session
session = _get_session()
if session is None:
print("DownloadManager: Cannot download, aiohttp not available")
return False if (outfile or chunk_callback) else None
# Increment refcount
global _session_refcount
if _session_lock:
_session_lock.acquire()
_session_refcount += 1
if _session_lock:
_session_lock.release()
print(f"DownloadManager: Downloading {url}")
fd = None
try:
# Ensure headers is a dict (aiohttp expects dict, not None)
if headers is None:
headers = {}
async with session.get(url, headers=headers) as response:
if response.status < 200 or response.status >= 400:
print(f"DownloadManager: HTTP error {response.status}")
return False if (outfile or chunk_callback) else None
# Figure out total size
print("DownloadManager: Response headers:", response.headers)
if total_size is None:
# response.headers is a dict (after parsing) or None/list (before parsing)
try:
if isinstance(response.headers, dict):
content_length = response.headers.get('Content-Length')
if content_length:
total_size = int(content_length)
except (AttributeError, TypeError, ValueError) as e:
print(f"DownloadManager: Could not parse Content-Length: {e}")
if total_size is None:
print(f"DownloadManager: WARNING: Unable to determine total_size, assuming {_DEFAULT_TOTAL_SIZE} bytes")
total_size = _DEFAULT_TOTAL_SIZE
# Setup output
if outfile:
fd = open(outfile, 'wb')
if not fd:
print(f"DownloadManager: WARNING: could not open {outfile} for writing!")
return False
chunks = []
partial_size = 0
chunk_size = _DEFAULT_CHUNK_SIZE
print(f"DownloadManager: {'Writing to ' + outfile if outfile else 'Downloading'} {total_size} bytes in chunks of size {chunk_size}")
# Download loop with retry logic
while True:
tries_left = _MAX_RETRIES
chunk_data = None
while tries_left > 0:
try:
# Import TaskManager here to avoid circular imports
from mpos import TaskManager
chunk_data = await TaskManager.wait_for(
response.content.read(chunk_size),
_CHUNK_TIMEOUT_SECONDS
)
break
except Exception as e:
print(f"DownloadManager: Chunk read error: {e}")
tries_left -= 1
if tries_left == 0:
print("DownloadManager: ERROR: failed to download chunk after retries!")
if fd:
fd.close()
return False if (outfile or chunk_callback) else None
if chunk_data:
# Output chunk
if fd:
fd.write(chunk_data)
elif chunk_callback:
await chunk_callback(chunk_data)
else:
chunks.append(chunk_data)
# Report progress
partial_size += len(chunk_data)
progress_pct = round((partial_size * 100) / int(total_size))
print(f"DownloadManager: Progress: {partial_size} / {total_size} bytes = {progress_pct}%")
if progress_callback:
await progress_callback(progress_pct)
else:
# Chunk is None, download complete
print(f"DownloadManager: Finished downloading {url}")
if fd:
fd.close()
fd = None
return True
elif chunk_callback:
return True
else:
return b''.join(chunks)
except Exception as e:
print(f"DownloadManager: Exception during download: {e}")
if fd:
fd.close()
return False if (outfile or chunk_callback) else None
finally:
# Decrement refcount
if _session_lock:
_session_lock.acquire()
_session_refcount -= 1
if _session_lock:
_session_lock.release()
# Close session if idle
await _close_session_if_idle()
+417
View File
@@ -0,0 +1,417 @@
"""
test_download_manager.py - Tests for DownloadManager module
Tests the centralized download manager functionality including:
- Session lifecycle management
- Download modes (memory, file, streaming)
- Progress tracking
- Error handling
- Resume support with Range headers
- Concurrent downloads
"""
import unittest
import os
import sys
# Import the module under test
sys.path.insert(0, '../internal_filesystem/lib')
import mpos.net.download_manager as DownloadManager
class TestDownloadManager(unittest.TestCase):
"""Test cases for DownloadManager module."""
def setUp(self):
"""Reset module state before each test."""
# Reset module-level state
DownloadManager._session = None
DownloadManager._session_refcount = 0
DownloadManager._session_lock = None
# Create temp directory for file downloads
self.temp_dir = "/tmp/test_download_manager"
try:
os.mkdir(self.temp_dir)
except OSError:
pass # Directory already exists
def tearDown(self):
"""Clean up after each test."""
# Close any open sessions
import asyncio
if DownloadManager._session:
asyncio.run(DownloadManager.close_session())
# Clean up temp files
try:
import os
for file in os.listdir(self.temp_dir):
try:
os.remove(f"{self.temp_dir}/{file}")
except OSError:
pass
os.rmdir(self.temp_dir)
except OSError:
pass
# ==================== Session Lifecycle Tests ====================
def test_lazy_session_creation(self):
"""Test that session is created lazily on first download."""
import asyncio
async def run_test():
# Verify no session exists initially
self.assertFalse(DownloadManager.is_session_active())
# Perform a download
data = await DownloadManager.download_url("https://httpbin.org/bytes/100")
# Verify session was created
# Note: Session may be closed immediately after download if refcount == 0
# So we can't reliably check is_session_active() here
self.assertIsNotNone(data)
self.assertEqual(len(data), 100)
asyncio.run(run_test())
def test_session_reuse_across_downloads(self):
"""Test that the same session is reused for multiple downloads."""
import asyncio
async def run_test():
# Perform first download
data1 = await DownloadManager.download_url("https://httpbin.org/bytes/50")
self.assertIsNotNone(data1)
# Perform second download
data2 = await DownloadManager.download_url("https://httpbin.org/bytes/75")
self.assertIsNotNone(data2)
# Verify different data was downloaded
self.assertEqual(len(data1), 50)
self.assertEqual(len(data2), 75)
asyncio.run(run_test())
def test_explicit_session_close(self):
"""Test explicit session closure."""
import asyncio
async def run_test():
# Create session by downloading
data = await DownloadManager.download_url("https://httpbin.org/bytes/10")
self.assertIsNotNone(data)
# Explicitly close session
await DownloadManager.close_session()
# Verify session is closed
self.assertFalse(DownloadManager.is_session_active())
# Verify new download recreates session
data2 = await DownloadManager.download_url("https://httpbin.org/bytes/20")
self.assertIsNotNone(data2)
self.assertEqual(len(data2), 20)
asyncio.run(run_test())
# ==================== Download Mode Tests ====================
def test_download_to_memory(self):
"""Test downloading content to memory (returns bytes)."""
import asyncio
async def run_test():
data = await DownloadManager.download_url("https://httpbin.org/bytes/1024")
self.assertIsInstance(data, bytes)
self.assertEqual(len(data), 1024)
asyncio.run(run_test())
def test_download_to_file(self):
"""Test downloading content to file (returns True/False)."""
import asyncio
async def run_test():
outfile = f"{self.temp_dir}/test_download.bin"
success = await DownloadManager.download_url(
"https://httpbin.org/bytes/2048",
outfile=outfile
)
self.assertTrue(success)
self.assertEqual(os.stat(outfile)[6], 2048)
# Clean up
os.remove(outfile)
asyncio.run(run_test())
def test_download_with_chunk_callback(self):
"""Test streaming download with chunk callback."""
import asyncio
async def run_test():
chunks_received = []
async def collect_chunks(chunk):
chunks_received.append(chunk)
success = await DownloadManager.download_url(
"https://httpbin.org/bytes/512",
chunk_callback=collect_chunks
)
self.assertTrue(success)
self.assertTrue(len(chunks_received) > 0)
# Verify total size matches
total_size = sum(len(chunk) for chunk in chunks_received)
self.assertEqual(total_size, 512)
asyncio.run(run_test())
def test_parameter_validation_conflicting_params(self):
"""Test that outfile and chunk_callback cannot both be provided."""
import asyncio
async def run_test():
with self.assertRaises(ValueError) as context:
await DownloadManager.download_url(
"https://httpbin.org/bytes/100",
outfile="/tmp/test.bin",
chunk_callback=lambda chunk: None
)
self.assertIn("Cannot use both", str(context.exception))
asyncio.run(run_test())
# ==================== Progress Tracking Tests ====================
def test_progress_callback(self):
"""Test that progress callback is called with percentages."""
import asyncio
async def run_test():
progress_calls = []
async def track_progress(percent):
progress_calls.append(percent)
data = await DownloadManager.download_url(
"https://httpbin.org/bytes/5120", # 5KB
progress_callback=track_progress
)
self.assertIsNotNone(data)
self.assertTrue(len(progress_calls) > 0)
# Verify progress values are in valid range
for pct in progress_calls:
self.assertTrue(0 <= pct <= 100)
# Verify progress generally increases (allowing for some rounding variations)
# Note: Due to chunking and rounding, progress might not be strictly increasing
self.assertTrue(progress_calls[-1] >= 90) # Should end near 100%
asyncio.run(run_test())
def test_progress_with_explicit_total_size(self):
"""Test progress tracking with explicitly provided total_size."""
import asyncio
async def run_test():
progress_calls = []
async def track_progress(percent):
progress_calls.append(percent)
data = await DownloadManager.download_url(
"https://httpbin.org/bytes/3072", # 3KB
total_size=3072,
progress_callback=track_progress
)
self.assertIsNotNone(data)
self.assertTrue(len(progress_calls) > 0)
asyncio.run(run_test())
# ==================== Error Handling Tests ====================
def test_http_error_status(self):
"""Test handling of HTTP error status codes."""
import asyncio
async def run_test():
# Request 404 error from httpbin
data = await DownloadManager.download_url("https://httpbin.org/status/404")
# Should return None for memory download
self.assertIsNone(data)
asyncio.run(run_test())
def test_http_error_with_file_output(self):
"""Test that file download returns False on HTTP error."""
import asyncio
async def run_test():
outfile = f"{self.temp_dir}/error_test.bin"
success = await DownloadManager.download_url(
"https://httpbin.org/status/500",
outfile=outfile
)
# Should return False for file download
self.assertFalse(success)
# File should not be created
try:
os.stat(outfile)
self.fail("File should not exist after failed download")
except OSError:
pass # Expected - file doesn't exist
asyncio.run(run_test())
def test_invalid_url(self):
"""Test handling of invalid URL."""
import asyncio
async def run_test():
# Invalid URL should raise exception or return None
try:
data = await DownloadManager.download_url("http://invalid-url-that-does-not-exist.local/")
# If it doesn't raise, it should return None
self.assertIsNone(data)
except Exception:
# Exception is acceptable
pass
asyncio.run(run_test())
# ==================== Headers Support Tests ====================
def test_custom_headers(self):
"""Test that custom headers are passed to the request."""
import asyncio
async def run_test():
# httpbin.org/headers echoes back the headers sent
data = await DownloadManager.download_url(
"https://httpbin.org/headers",
headers={"X-Custom-Header": "TestValue"}
)
self.assertIsNotNone(data)
# Verify the custom header was included (httpbin echoes it back)
response_text = data.decode('utf-8')
self.assertIn("X-Custom-Header", response_text)
self.assertIn("TestValue", response_text)
asyncio.run(run_test())
# ==================== Edge Cases Tests ====================
def test_empty_response(self):
"""Test handling of empty (0-byte) downloads."""
import asyncio
async def run_test():
# Download 0 bytes
data = await DownloadManager.download_url("https://httpbin.org/bytes/0")
self.assertIsNotNone(data)
self.assertEqual(len(data), 0)
self.assertEqual(data, b'')
asyncio.run(run_test())
def test_small_download(self):
"""Test downloading very small files (smaller than chunk size)."""
import asyncio
async def run_test():
# Download 10 bytes (much smaller than 1KB chunk size)
data = await DownloadManager.download_url("https://httpbin.org/bytes/10")
self.assertIsNotNone(data)
self.assertEqual(len(data), 10)
asyncio.run(run_test())
def test_json_download(self):
"""Test downloading JSON data."""
import asyncio
import json
async def run_test():
data = await DownloadManager.download_url("https://httpbin.org/json")
self.assertIsNotNone(data)
# Verify it's valid JSON
parsed = json.loads(data.decode('utf-8'))
self.assertIsInstance(parsed, dict)
asyncio.run(run_test())
# ==================== File Operations Tests ====================
def test_file_download_creates_directory_if_needed(self):
"""Test that parent directories are NOT created (caller's responsibility)."""
import asyncio
async def run_test():
# Try to download to non-existent directory
outfile = "/tmp/nonexistent_dir_12345/test.bin"
try:
success = await DownloadManager.download_url(
"https://httpbin.org/bytes/100",
outfile=outfile
)
# Should fail because directory doesn't exist
self.assertFalse(success)
except Exception:
# Exception is acceptable
pass
asyncio.run(run_test())
def test_file_overwrite(self):
"""Test that downloading overwrites existing files."""
import asyncio
async def run_test():
outfile = f"{self.temp_dir}/overwrite_test.bin"
# Create initial file
with open(outfile, 'wb') as f:
f.write(b'old content')
# Download and overwrite
success = await DownloadManager.download_url(
"https://httpbin.org/bytes/100",
outfile=outfile
)
self.assertTrue(success)
self.assertEqual(os.stat(outfile)[6], 100)
# Verify old content is gone
with open(outfile, 'rb') as f:
content = f.read()
self.assertNotEqual(content, b'old content')
self.assertEqual(len(content), 100)
# Clean up
os.remove(outfile)
asyncio.run(run_test())