diff --git a/.gitignore b/.gitignore index f1073a8f..251f3ef8 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ internal_filesystem/SDLPointer_3 # config files etc: internal_filesystem/data internal_filesystem/sdcard +internal_filesystem/tests # these tests contain actual NWC URLs: tests/manual_test_nwcwallet_alby.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ff44c060..47fe4a9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,18 @@ +0.3.4 (unreleased) +================== +OSUpdate app: Major rework with improved reliability and user experience +- OSUpdate app: add WiFi monitoring - shows "Waiting for WiFi..." instead of error when no connection +- OSUpdate app: add automatic pause/resume on WiFi loss during downloads using HTTP Range headers +- OSUpdate app: add user-friendly error messages with specific guidance for each error type +- OSUpdate app: add "Check Again" button for easy retry after errors +- OSUpdate app: add state machine for better app state management +- OSUpdate app: add comprehensive test coverage (42 tests: 31 unit tests + 11 graphical tests) +- OSUpdate app: refactor code into testable components (NetworkMonitor, UpdateChecker, UpdateDownloader) +- OSUpdate app: improve download error recovery with progress preservation +- OSUpdate app: improve timeout handling (5-minute wait for WiFi with clear messaging) +- Tests: add test infrastructure with mock classes for network, HTTP, and partition operations +- Tests: add graphical test helper utilities for UI verification and screenshot capture + 0.3.3 ===== - Camera app: fix one-in-two "camera image stays blank" issue diff --git a/internal_filesystem/builtin/apps/com.micropythonos.osupdate/assets/osupdate.py b/internal_filesystem/builtin/apps/com.micropythonos.osupdate/assets/osupdate.py index b86b8ede..ee2e3080 100644 --- a/internal_filesystem/builtin/apps/com.micropythonos.osupdate/assets/osupdate.py +++ b/internal_filesystem/builtin/apps/com.micropythonos.osupdate/assets/osupdate.py @@ -5,6 +5,7 @@ import time import _thread from mpos.apps import Activity +from mpos import PackageManager import mpos.info import mpos.ui @@ -16,10 +17,28 @@ class OSUpdate(Activity): status_label = None install_button = None force_update = None + check_again_button = None main_screen = None progress_label = None progress_bar = None + # State management + current_state = None + + def __init__(self): + super().__init__() + # Initialize business logic components with dependency injection + self.network_monitor = NetworkMonitor() + self.update_checker = UpdateChecker() + self.update_downloader = UpdateDownloader(network_monitor=self.network_monitor) + self.current_state = UpdateState.IDLE + + def set_state(self, new_state): + """Change app state and update UI accordingly.""" + print(f"OSUpdate: state change {self.current_state} -> {new_state}") + self.current_state = new_state + self._update_ui_for_state() + def onCreate(self): self.main_screen = lv.obj() self.main_screen.set_style_pad_all(mpos.ui.pct_of_display_width(2), 0) @@ -44,63 +63,150 @@ class OSUpdate(Activity): install_label = lv.label(self.install_button) install_label.set_text("Update OS") install_label.center() + + # Check Again button (hidden initially, shown on errors) + self.check_again_button = lv.button(self.main_screen) + self.check_again_button.align(lv.ALIGN.BOTTOM_MID, 0, -10) + self.check_again_button.set_size(lv.SIZE_CONTENT, lv.pct(15)) + self.check_again_button.add_event_cb(lambda e: self.check_again_click(), lv.EVENT.CLICKED, None) + self.check_again_button.add_flag(lv.obj.FLAG.HIDDEN) # Initially hidden + check_again_label = lv.label(self.check_again_button) + check_again_label.set_text("Check Again") + check_again_label.center() + self.status_label = lv.label(self.main_screen) self.status_label.align_to(self.force_update, lv.ALIGN.OUT_BOTTOM_LEFT, 0, mpos.ui.pct_of_display_height(5)) self.setContentView(self.main_screen) def onStart(self, screen): - network_connected = True - try: - import network - network_connected = network.WLAN(network.STA_IF).isconnected() - except Exception as e: - print("Warning: could not check WLAN status:", str(e)) - - if not network_connected: - self.status_label.set_text("Error: WiFi is not connected.") + # Check wifi and either start update check or wait for wifi + if not self.network_monitor.is_connected(): + self.set_state(UpdateState.WAITING_WIFI) + # Start wifi monitoring in background + _thread.stack_size(mpos.apps.good_stack_size()) + _thread.start_new_thread(self._wifi_wait_thread, ()) else: + self.set_state(UpdateState.CHECKING_UPDATE) print("Showing update info...") self.show_update_info() + def _update_ui_for_state(self): + """Update UI elements based on current state.""" + if self.current_state == UpdateState.WAITING_WIFI: + self.status_label.set_text("Waiting for WiFi connection...") + self.check_again_button.add_flag(lv.obj.FLAG.HIDDEN) + elif self.current_state == UpdateState.CHECKING_UPDATE: + self.status_label.set_text("Checking for OS updates...") + self.check_again_button.add_flag(lv.obj.FLAG.HIDDEN) + elif self.current_state == UpdateState.DOWNLOADING: + self.status_label.set_text("Update in progress.\nNavigate away to cancel.") + self.check_again_button.add_flag(lv.obj.FLAG.HIDDEN) + elif self.current_state == UpdateState.DOWNLOAD_PAUSED: + self.status_label.set_text("Download paused - waiting for WiFi...") + self.check_again_button.add_flag(lv.obj.FLAG.HIDDEN) + elif self.current_state == UpdateState.ERROR: + # Show "Check Again" button on errors + self.check_again_button.remove_flag(lv.obj.FLAG.HIDDEN) + + def _wifi_wait_thread(self): + """Background thread that waits for wifi connection.""" + print("OSUpdate: waiting for wifi...") + check_interval = 5 # Check every 5 seconds + max_wait_time = 300 # 5 minutes timeout + elapsed = 0 + + while elapsed < max_wait_time and self.has_foreground(): + if self.network_monitor.is_connected(): + print("OSUpdate: wifi connected, checking for updates") + # Switch to checking state and start update check + self.update_ui_threadsafe_if_foreground( + self.set_state, UpdateState.CHECKING_UPDATE + ) + self.show_update_info() + return + + time.sleep(check_interval) + elapsed += check_interval + + # Timeout or user navigated away + if self.has_foreground(): + self.update_ui_threadsafe_if_foreground( + self.status_label.set_text, + "WiFi connection timeout.\nPlease check your network and restart the app." + ) + + def _get_user_friendly_error(self, error): + """Convert technical errors into user-friendly messages with guidance.""" + error_str = str(error).lower() + + # HTTP errors + if "404" in error_str: + return ("Update information not found for your device.\n\n" + "This hardware may not yet be supported.\n" + "Check https://micropythonos.com for updates.") + elif "500" in error_str or "502" in error_str or "503" in error_str: + return ("Update server is temporarily unavailable.\n\n" + "Please try again in a few minutes.") + elif "timeout" in error_str: + return ("Connection timeout.\n\n" + "Check your internet connection and try again.") + elif "connection refused" in error_str: + return ("Cannot connect to update server.\n\n" + "Check your internet connection.") + + # JSON/Data errors + elif "invalid json" in error_str or "syntax error" in error_str: + return ("Server returned invalid data.\n\n" + "The update server may be experiencing issues.\n" + "Try again later.") + elif "missing required fields" in error_str: + return ("Update information is incomplete.\n\n" + "The update server may be experiencing issues.\n" + "Try again later.") + + # Storage errors + elif "enospc" in error_str or "no space" in error_str: + return ("Not enough storage space.\n\n" + "Free up space and try again.") + + # Generic errors + else: + return f"An error occurred:\n{str(error)}\n\nPlease try again." + def show_update_info(self): self.status_label.set_text("Checking for OS updates...") hwid = mpos.info.get_hardware_id() - if (hwid == "waveshare-esp32-s3-touch-lcd-2"): - infofile = "osupdate.json" - # Device that was first supported did not have the hardware ID in the URL, so it's special: - else: - infofile = f"osupdate_{hwid}.json" - url = f"https://updates.micropythonos.com/{infofile}" - print(f"OSUpdate: fetching {url}") + try: - print("doing requests.get()") - # Download the JSON - response = requests.get(url) - # Check if request was successful - if response.status_code == 200: - # Parse JSON - osupdate = ujson.loads(response.text) - # Access attributes - version = osupdate["version"] - download_url = osupdate["download_url"] - changelog = osupdate["changelog"] - # Print the values - print("Version:", version) - print("Download URL:", download_url) - print("Changelog:", changelog) - self.handle_update_info(version, download_url, changelog) - else: - self.status_label.set_text(f"Error: {response.status_code} while checking\nfile: {infofile}\nat: {url}") - print("Failed to download JSON. Status code:", response.status_code) - # Close response - response.close() + # Use UpdateChecker to fetch update info + update_info = self.update_checker.fetch_update_info(hwid) + self.handle_update_info( + update_info["version"], + update_info["download_url"], + update_info["changelog"] + ) + except ValueError as e: + # JSON parsing or validation error + self.set_state(UpdateState.ERROR) + self.status_label.set_text(self._get_user_friendly_error(e)) + except RuntimeError as e: + # Network or HTTP error + self.set_state(UpdateState.ERROR) + self.status_label.set_text(self._get_user_friendly_error(e)) except Exception as e: - print("Error:", str(e)) + # Unexpected error + self.set_state(UpdateState.ERROR) + self.status_label.set_text(self._get_user_friendly_error(e)) def handle_update_info(self, version, download_url, changelog): self.download_update_url = download_url - if compare_versions(version, mpos.info.CURRENT_OS_VERSION): - #if True: # for testing + + # Use UpdateChecker to determine if update is available + is_newer = self.update_checker.is_update_available( + version, mpos.info.CURRENT_OS_VERSION + ) + + if is_newer: label = "New" self.install_button.remove_state(lv.STATE.DISABLED) else: @@ -117,8 +223,10 @@ class OSUpdate(Activity): return else: print(f"install_button_click for url {self.download_update_url}") - self.install_button.add_state(lv.STATE.DISABLED) # button will be enabled if there is an update available - self.status_label.set_text("Update in progress.\nNavigate away to cancel.") + + self.install_button.add_state(lv.STATE.DISABLED) + self.set_state(UpdateState.DOWNLOADING) + self.progress_label = lv.label(self.main_screen) self.progress_label.set_text("OS Update: 0.00%") self.progress_label.align(lv.ALIGN.CENTER, 0, 0) @@ -139,6 +247,13 @@ class OSUpdate(Activity): else: self.install_button.add_state(lv.STATE.DISABLED) + def check_again_click(self): + """Handle 'Check Again' button click - retry update check.""" + print("OSUpdate: Check Again button clicked") + self.check_again_button.add_flag(lv.obj.FLAG.HIDDEN) + self.set_state(UpdateState.CHECKING_UPDATE) + self.show_update_info() + def progress_callback(self, percent): print(f"OTA Update: {percent:.1f}%") self.update_ui_threadsafe_if_foreground(self.progress_bar.set_value, int(percent), True) @@ -147,84 +262,431 @@ class OSUpdate(Activity): # Custom OTA update with LVGL progress def update_with_lvgl(self, url): - simulate = False + """Download and install update in background thread. + + Supports automatic pause/resume on wifi loss. + """ try: - from esp32 import Partition - #current_partition = Partition(Partition.RUNNING) - #print(f"Current partition: {current_partition}") - #next_partition = current_partition.get_next_update() - #print(f"Next partition: {next_partition}") - current = Partition(Partition.RUNNING) - next_partition = current.get_next_update() - #import ota.update - #import ota.status - #ota.status.status() - except Exception as e: - print("Warning: could not import esp32.Partition, simulating update...") - simulate = True - response = requests.get(url, stream=True) - total_size = int(response.headers.get('Content-Length', 0)) - bytes_written = 0 - chunk_size = 4096 - i = 0 - total_size = round_up_to_multiple(total_size, chunk_size) - print(f"Starting OTA update of size: {total_size}") - while self.has_foreground(): # stop if the user navigates away - time.sleep_ms(100) # don't hog the CPU - chunk = response.raw.read(chunk_size) - if not chunk: - print("No chunk, breaking...") - break - if len(chunk) < chunk_size: - print(f"Padding chunk {i} from {len(chunk)} to {chunk_size} bytes") - chunk = chunk + b'\xFF' * (chunk_size - len(chunk)) - print(f"Writing chunk {i} with length {len(chunk)}") - if not simulate: - next_partition.writeblocks(i, chunk) - bytes_written += len(chunk) - i += 1 - if total_size: - self.progress_callback(bytes_written / total_size * 100) - response.close() - try: - if bytes_written >= total_size: - if not simulate: # if the update was completely installed - next_partition.set_boot() - import machine - machine.reset() - # self.install_button stays disabled to prevent the user from installing the same update twice + # Loop to handle pause/resume cycles + while self.has_foreground(): + # Use UpdateDownloader to handle the download + result = self.update_downloader.download_and_install( + url, + progress_callback=self.progress_callback, + should_continue_callback=self.has_foreground + ) + + if result['success']: + # Update succeeded - set boot partition and restart + self.update_ui_threadsafe_if_foreground( + self.status_label.set_text, + "Update finished! Restarting..." + ) + # Small delay to show the message + time.sleep_ms(500) + self.update_downloader.set_boot_partition_and_restart() + return + + elif result.get('paused', False): + # Download paused due to wifi loss + bytes_written = result.get('bytes_written', 0) + total_size = result.get('total_size', 0) + percent = (bytes_written / total_size * 100) if total_size > 0 else 0 + + print(f"OSUpdate: Download paused at {percent:.1f}% ({bytes_written}/{total_size} bytes)") + self.update_ui_threadsafe_if_foreground( + self.set_state, UpdateState.DOWNLOAD_PAUSED + ) + + # Wait for wifi to return + check_interval = 5 # Check every 5 seconds + max_wait = 300 # 5 minutes timeout + elapsed = 0 + + while elapsed < max_wait and self.has_foreground(): + if self.network_monitor.is_connected(): + print("OSUpdate: WiFi reconnected, resuming download") + self.update_ui_threadsafe_if_foreground( + self.set_state, UpdateState.DOWNLOADING + ) + break # Exit wait loop and retry download + + time.sleep(check_interval) + elapsed += check_interval + + if elapsed >= max_wait: + # Timeout waiting for wifi + msg = f"WiFi timeout during download.\n{bytes_written}/{total_size} bytes written.\nPress Update to retry." + self.update_ui_threadsafe_if_foreground(self.status_label.set_text, msg) + self.update_ui_threadsafe_if_foreground( + self.install_button.remove_state, lv.STATE.DISABLED + ) + return + + # If we're here, wifi is back - continue to next iteration to resume + else: - print("This is an OSUpdate simulation, not attempting to restart the device.") - self.update_ui_threadsafe_if_foreground(self.status_label.set_text, "Update finished! Please restart.") - else: - self.update_ui_threadsafe_if_foreground(self.status_label.set_text, f"Wrote {bytes_written} < {total_size} so not enough!") - self.update_ui_threadsafe_if_foreground(self.install_button.remove_state, lv.STATE.DISABLED) # allow retry + # Update failed with error (not pause) + error_msg = result.get('error', 'Unknown error') + bytes_written = result.get('bytes_written', 0) + total_size = result.get('total_size', 0) + + if "cancelled" in error_msg.lower(): + msg = ("Update cancelled by user.\n\n" + f"{bytes_written}/{total_size} bytes downloaded.\n" + "Press 'Update OS' to resume.") + else: + # Use friendly error message + friendly_msg = self._get_user_friendly_error(Exception(error_msg)) + progress_info = f"\n\nProgress: {bytes_written}/{total_size} bytes" + if bytes_written > 0: + progress_info += "\n\nPress 'Update OS' to resume." + msg = friendly_msg + progress_info + + self.update_ui_threadsafe_if_foreground( + self.set_state, UpdateState.ERROR + ) + self.update_ui_threadsafe_if_foreground(self.status_label.set_text, msg) + self.update_ui_threadsafe_if_foreground( + self.install_button.remove_state, lv.STATE.DISABLED + ) # allow retry + return + except Exception as e: - self.update_ui_threadsafe_if_foreground(self.status_label.set_text, f"Update error: {e}") - self.update_ui_threadsafe_if_foreground(self.install_button.remove_state, lv.STATE.DISABLED) # allow retry + msg = self._get_user_friendly_error(e) + "\n\nPress 'Update OS' to retry." + self.update_ui_threadsafe_if_foreground( + self.set_state, UpdateState.ERROR + ) + self.update_ui_threadsafe_if_foreground( + self.status_label.set_text, msg + ) + self.update_ui_threadsafe_if_foreground( + self.install_button.remove_state, lv.STATE.DISABLED + ) # allow retry + +# Business Logic Classes: + +class UpdateState: + """State machine states for OSUpdate app.""" + IDLE = "idle" + WAITING_WIFI = "waiting_wifi" + CHECKING_UPDATE = "checking_update" + UPDATE_AVAILABLE = "update_available" + NO_UPDATE = "no_update" + DOWNLOADING = "downloading" + DOWNLOAD_PAUSED = "download_paused" + COMPLETED = "completed" + ERROR = "error" + +class NetworkMonitor: + """Monitors network connectivity status.""" + + def __init__(self, network_module=None): + """Initialize with optional dependency injection for testing. + + Args: + network_module: Network module (defaults to network if available) + """ + self.network_module = network_module + if self.network_module is None: + try: + import network + self.network_module = network + except ImportError: + # Desktop/simulation mode - no network module + self.network_module = None + + def is_connected(self): + """Check if WiFi is currently connected. + + Returns: + bool: True if connected, False otherwise + """ + if self.network_module is None: + # No network module available (desktop mode) + # Assume connected for testing purposes + return True + + try: + wlan = self.network_module.WLAN(self.network_module.STA_IF) + return wlan.isconnected() + except Exception as e: + print(f"NetworkMonitor: Error checking connection: {e}") + return False + + +class UpdateDownloader: + """Handles downloading and installing OS updates.""" + + def __init__(self, requests_module=None, partition_module=None, network_monitor=None): + """Initialize with optional dependency injection for testing. + + Args: + requests_module: HTTP requests module (defaults to requests) + partition_module: ESP32 Partition module (defaults to esp32.Partition if available) + network_monitor: NetworkMonitor instance for checking wifi during download + """ + self.requests = requests_module if requests_module else requests + self.partition_module = partition_module + self.network_monitor = network_monitor + self.simulate = False + + # Download state for pause/resume + self.is_paused = False + self.bytes_written_so_far = 0 + self.total_size_expected = 0 + + # Try to import Partition if not provided + if self.partition_module is None: + try: + from esp32 import Partition + self.partition_module = Partition + except ImportError: + print("UpdateDownloader: Partition module not available, will simulate") + self.simulate = True + + def download_and_install(self, url, progress_callback=None, should_continue_callback=None): + """Download firmware and install to OTA partition. + + Supports pause/resume on wifi loss using HTTP Range headers. + + Args: + url: URL to download firmware from + progress_callback: Optional callback function(percent: float) + should_continue_callback: Optional callback function() -> bool + Returns False to cancel download + + Returns: + dict: Result with keys: + - 'success': bool + - 'bytes_written': int + - 'total_size': int + - 'error': str (if success=False) + - 'paused': bool (if paused due to wifi loss) + + Raises: + Exception: If download or installation fails + """ + result = { + 'success': False, + 'bytes_written': 0, + 'total_size': 0, + 'error': None, + 'paused': False + } + + try: + # Get OTA partition + next_partition = None + if not self.simulate: + current = self.partition_module(self.partition_module.RUNNING) + next_partition = current.get_next_update() + print(f"UpdateDownloader: Writing to partition: {next_partition}") + + # Start download (or resume if we have bytes_written_so_far) + headers = {} + if self.bytes_written_so_far > 0: + headers['Range'] = f'bytes={self.bytes_written_so_far}-' + print(f"UpdateDownloader: Resuming from byte {self.bytes_written_so_far}") + + response = self.requests.get(url, stream=True, headers=headers) + + # For initial download, get total size + if self.bytes_written_so_far == 0: + total_size = int(response.headers.get('Content-Length', 0)) + result['total_size'] = round_up_to_multiple(total_size, 4096) + self.total_size_expected = result['total_size'] + else: + # For resume, use the stored total size + # (Content-Length will be the remaining bytes, not total) + result['total_size'] = self.total_size_expected + + print(f"UpdateDownloader: Download target {result['total_size']} bytes") + + chunk_size = 4096 + bytes_written = self.bytes_written_so_far + block_index = bytes_written // chunk_size + + while True: + # Check if we should continue (user cancelled) + if should_continue_callback and not should_continue_callback(): + result['error'] = "Download cancelled by user" + response.close() + return result + + # Check wifi connection (if monitoring enabled) + if self.network_monitor and not self.network_monitor.is_connected(): + print("UpdateDownloader: WiFi lost, pausing download") + self.is_paused = True + self.bytes_written_so_far = bytes_written + result['paused'] = True + result['bytes_written'] = bytes_written + response.close() + return result + + # Read next chunk + chunk = response.raw.read(chunk_size) + if not chunk: + break + + # Pad last chunk if needed + if len(chunk) < chunk_size: + print(f"UpdateDownloader: Padding chunk {block_index} from {len(chunk)} to {chunk_size} bytes") + chunk = chunk + b'\xFF' * (chunk_size - len(chunk)) + + # Write to partition + if not self.simulate: + next_partition.writeblocks(block_index, chunk) + + bytes_written += len(chunk) + self.bytes_written_so_far = bytes_written + block_index += 1 + + # Update progress + if progress_callback and result['total_size'] > 0: + percent = (bytes_written / result['total_size']) * 100 + progress_callback(percent) + + # Small delay to avoid hogging CPU + time.sleep_ms(100) + + response.close() + result['bytes_written'] = bytes_written + + # Check if complete + if bytes_written >= result['total_size']: + result['success'] = True + self.is_paused = False + self.bytes_written_so_far = 0 # Reset for next download + self.total_size_expected = 0 + print(f"UpdateDownloader: Download complete ({bytes_written} bytes)") + else: + result['error'] = f"Incomplete download: {bytes_written} < {result['total_size']}" + print(f"UpdateDownloader: {result['error']}") + + except Exception as e: + result['error'] = str(e) + print(f"UpdateDownloader: Error during download: {e}") + + return result + + def set_boot_partition_and_restart(self): + """Set the updated partition as boot partition and restart device. + + Only works on ESP32 hardware. On desktop, just prints a message. + """ + if self.simulate: + print("UpdateDownloader: Simulating restart (desktop mode)") + return + + try: + current = self.partition_module(self.partition_module.RUNNING) + next_partition = current.get_next_update() + next_partition.set_boot() + print("UpdateDownloader: Boot partition set, restarting...") + + import machine + machine.reset() + except Exception as e: + print(f"UpdateDownloader: Error setting boot partition: {e}") + raise + + +class UpdateChecker: + """Handles checking for OS updates from remote server.""" + + def __init__(self, requests_module=None, json_module=None): + """Initialize with optional dependency injection for testing. + + Args: + requests_module: HTTP requests module (defaults to requests) + json_module: JSON parsing module (defaults to ujson) + """ + self.requests = requests_module if requests_module else requests + self.json = json_module if json_module else ujson + + def get_update_url(self, hardware_id): + """Determine the update JSON URL based on hardware ID. + + Args: + hardware_id: Hardware identifier string + + Returns: + str: Full URL to update JSON file + """ + if hardware_id == "waveshare-esp32-s3-touch-lcd-2": + # First supported device - no hardware ID in URL + infofile = "osupdate.json" + else: + infofile = f"osupdate_{hardware_id}.json" + return f"https://updates.micropythonos.com/{infofile}" + + def fetch_update_info(self, hardware_id): + """Fetch and parse update information from server. + + Args: + hardware_id: Hardware identifier string + + Returns: + dict: Update info with keys 'version', 'download_url', 'changelog' + or None if error occurred + + Raises: + ValueError: If JSON is malformed or missing required fields + ConnectionError: If network request fails + """ + url = self.get_update_url(hardware_id) + print(f"OSUpdate: fetching {url}") + + try: + response = self.requests.get(url) + + if response.status_code != 200: + # Use RuntimeError instead of ConnectionError (not available in MicroPython) + raise RuntimeError( + f"HTTP {response.status_code} while checking {url}" + ) + + # Parse JSON + try: + update_data = self.json.loads(response.text) + except Exception as e: + raise ValueError(f"Invalid JSON in update file: {e}") + finally: + response.close() + + # Validate required fields + required_fields = ['version', 'download_url', 'changelog'] + missing_fields = [f for f in required_fields if f not in update_data] + if missing_fields: + raise ValueError( + f"Update file missing required fields: {', '.join(missing_fields)}" + ) + + print("Version:", update_data["version"]) + print("Download URL:", update_data["download_url"]) + print("Changelog:", update_data["changelog"]) + + return update_data + + except Exception as e: + print(f"Error fetching update info: {e}") + raise + + def is_update_available(self, remote_version, current_version): + """Check if remote version is newer than current version. + + Args: + remote_version: Version string from update server + current_version: Currently installed version string + + Returns: + bool: True if remote version is newer + """ + return PackageManager.compare_versions(remote_version, current_version) + # Non-class functions: def round_up_to_multiple(n, multiple): return ((n + multiple - 1) // multiple) * multiple - -def compare_versions(ver1: str, ver2: str) -> bool: - """Compare two version numbers (e.g., '1.2.3' vs '4.5.6'). - Returns True if ver1 is greater than ver2, False otherwise.""" - print(f"Comparing versions: {ver1} vs {ver2}") - v1_parts = [int(x) for x in ver1.split('.')] - v2_parts = [int(x) for x in ver2.split('.')] - print(f"Version 1 parts: {v1_parts}") - print(f"Version 2 parts: {v2_parts}") - for i in range(max(len(v1_parts), len(v2_parts))): - v1 = v1_parts[i] if i < len(v1_parts) else 0 - v2 = v2_parts[i] if i < len(v2_parts) else 0 - print(f"Comparing part {i}: {v1} vs {v2}") - if v1 > v2: - print(f"{ver1} is greater than {ver2}") - return True - if v1 < v2: - print(f"{ver1} is less than {ver2}") - return False - print(f"Versions are equal or {ver1} is not greater than {ver2}") - return False diff --git a/tests/test_osupdate.py b/tests/test_osupdate.py new file mode 100644 index 00000000..17df0a3d --- /dev/null +++ b/tests/test_osupdate.py @@ -0,0 +1,568 @@ +import unittest +import sys + +# Mock classes for testing +class MockNetwork: + """Mock network module for testing NetworkMonitor.""" + + STA_IF = 0 # Station interface constant + + class MockWLAN: + def __init__(self, interface): + self.interface = interface + self._connected = True # Default to connected + + def isconnected(self): + return self._connected + + def __init__(self, connected=True): + self._connected = connected + + def WLAN(self, interface): + wlan = self.MockWLAN(interface) + wlan._connected = self._connected + return wlan + + def set_connected(self, connected): + """Helper to change connection state.""" + self._connected = connected + + +class MockRaw: + """Mock raw response for streaming.""" + def __init__(self, content): + self.content = content + self.position = 0 + + def read(self, size): + chunk = self.content[self.position:self.position + size] + self.position += len(chunk) + return chunk + + +class MockResponse: + """Mock HTTP response.""" + def __init__(self, status_code=200, text='', headers=None, content=b''): + self.status_code = status_code + self.text = text + self.headers = headers or {} + self.content = content + self._closed = False + + # Mock raw attribute for streaming + self.raw = MockRaw(content) + + def close(self): + self._closed = True + + +class MockRequests: + """Mock requests module for testing UpdateChecker and UpdateDownloader.""" + + def __init__(self): + self.last_url = None + self.next_response = None + self.raise_exception = None + + def get(self, url, stream=False, timeout=None, headers=None): + self.last_url = url + + if self.raise_exception: + raise self.raise_exception + + if self.next_response: + return self.next_response + + # Default response + return MockResponse() + + def set_next_response(self, status_code=200, text='', headers=None, content=b''): + """Helper to set what the next get() should return.""" + self.next_response = MockResponse(status_code, text, headers, content) + return self.next_response + + def set_exception(self, exception): + """Helper to make next get() raise an exception.""" + self.raise_exception = exception + + +class MockJSON: + """Mock JSON module for testing UpdateChecker.""" + + def __init__(self): + self.raise_exception = None + + def loads(self, text): + if self.raise_exception: + raise self.raise_exception + + # Very simple JSON parser for testing + # In real tests, we can just use Python's json module + import json + return json.loads(text) + + def set_exception(self, exception): + """Helper to make loads() raise an exception.""" + self.raise_exception = exception + + +class MockPartition: + """Mock ESP32 Partition for testing UpdateDownloader.""" + + RUNNING = 0 + + def __init__(self, partition_type=None): + self.partition_type = partition_type + self.blocks = {} # Store written blocks + self.boot_set = False + + def get_next_update(self): + """Return a mock OTA partition.""" + return MockPartition() + + def writeblocks(self, block_num, data): + """Mock writing blocks.""" + self.blocks[block_num] = data + + def set_boot(self): + """Mock setting boot partition.""" + self.boot_set = True + + +# Import PackageManager which is needed by UpdateChecker +# The test runs from internal_filesystem/ directory, so we can import from lib/mpos +from mpos import PackageManager + +# Import the actual classes we're testing +# Tests run from internal_filesystem/, so we add the assets directory to path +sys.path.append('builtin/apps/com.micropythonos.osupdate/assets') +from osupdate import NetworkMonitor, UpdateChecker, UpdateDownloader, round_up_to_multiple + + +class TestNetworkMonitor(unittest.TestCase): + """Test NetworkMonitor class.""" + + def test_is_connected_with_connected_network(self): + """Test that is_connected returns True when network is connected.""" + mock_network = MockNetwork(connected=True) + monitor = NetworkMonitor(network_module=mock_network) + + self.assertTrue(monitor.is_connected()) + + def test_is_connected_with_disconnected_network(self): + """Test that is_connected returns False when network is disconnected.""" + mock_network = MockNetwork(connected=False) + monitor = NetworkMonitor(network_module=mock_network) + + self.assertFalse(monitor.is_connected()) + + def test_is_connected_without_network_module(self): + """Test that is_connected returns True when no network module (desktop mode).""" + monitor = NetworkMonitor(network_module=None) + + # Should return True (assume connected) in desktop mode + self.assertTrue(monitor.is_connected()) + + def test_is_connected_with_exception(self): + """Test that is_connected returns False when WLAN raises exception.""" + class BadNetwork: + STA_IF = 0 + def WLAN(self, interface): + raise Exception("WLAN error") + + monitor = NetworkMonitor(network_module=BadNetwork()) + + self.assertFalse(monitor.is_connected()) + + def test_network_state_change_detection(self): + """Test detecting network state changes.""" + mock_network = MockNetwork(connected=True) + monitor = NetworkMonitor(network_module=mock_network) + + # Initially connected + self.assertTrue(monitor.is_connected()) + + # Disconnect + mock_network.set_connected(False) + self.assertFalse(monitor.is_connected()) + + # Reconnect + mock_network.set_connected(True) + self.assertTrue(monitor.is_connected()) + + def test_multiple_checks_when_connected(self): + """Test that multiple checks return consistent results.""" + mock_network = MockNetwork(connected=True) + monitor = NetworkMonitor(network_module=mock_network) + + # Multiple checks should all return True + for _ in range(5): + self.assertTrue(monitor.is_connected()) + + def test_wlan_with_different_interface_types(self): + """Test that correct interface type is used.""" + class NetworkWithInterface: + STA_IF = 0 + CALLED_WITH = None + + class MockWLAN: + def __init__(self, interface): + NetworkWithInterface.CALLED_WITH = interface + self._connected = True + + def isconnected(self): + return self._connected + + def WLAN(self, interface): + return self.MockWLAN(interface) + + network = NetworkWithInterface() + monitor = NetworkMonitor(network_module=network) + monitor.is_connected() + + # Should have been called with STA_IF + self.assertEqual(NetworkWithInterface.CALLED_WITH, 0) + + +class TestUpdateChecker(unittest.TestCase): + """Test UpdateChecker class.""" + + def setUp(self): + self.mock_requests = MockRequests() + self.mock_json = MockJSON() + self.checker = UpdateChecker( + requests_module=self.mock_requests, + json_module=self.mock_json + ) + + def test_get_update_url_waveshare(self): + """Test URL generation for waveshare hardware.""" + url = self.checker.get_update_url("waveshare-esp32-s3-touch-lcd-2") + + self.assertEqual(url, "https://updates.micropythonos.com/osupdate.json") + + def test_get_update_url_other_hardware(self): + """Test URL generation for other hardware.""" + url = self.checker.get_update_url("fri3d-2024") + + self.assertEqual(url, "https://updates.micropythonos.com/osupdate_fri3d-2024.json") + + def test_fetch_update_info_success(self): + """Test successful update info fetch.""" + import json + update_data = { + "version": "0.3.3", + "download_url": "https://example.com/update.bin", + "changelog": "Bug fixes" + } + self.mock_requests.set_next_response( + status_code=200, + text=json.dumps(update_data) + ) + + result = self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + + self.assertEqual(result["version"], "0.3.3") + self.assertEqual(result["download_url"], "https://example.com/update.bin") + self.assertEqual(result["changelog"], "Bug fixes") + + def test_fetch_update_info_http_error(self): + """Test fetch with HTTP error response.""" + self.mock_requests.set_next_response(status_code=404) + + # MicroPython doesn't have ConnectionError, so catch generic Exception + try: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + self.fail("Should have raised an exception for HTTP 404") + except Exception as e: + # Should be a ConnectionError, but we accept any exception with HTTP status + self.assertIn("404", str(e)) + + def test_fetch_update_info_invalid_json(self): + """Test fetch with invalid JSON.""" + self.mock_requests.set_next_response( + status_code=200, + text="not valid json {" + ) + + with self.assertRaises(ValueError) as cm: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + + self.assertIn("Invalid JSON", str(cm.exception)) + + def test_fetch_update_info_missing_version_field(self): + """Test fetch with missing version field.""" + import json + self.mock_requests.set_next_response( + status_code=200, + text=json.dumps({"download_url": "http://example.com", "changelog": "test"}) + ) + + with self.assertRaises(ValueError) as cm: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + + self.assertIn("missing required fields", str(cm.exception)) + self.assertIn("version", str(cm.exception)) + + def test_fetch_update_info_missing_download_url_field(self): + """Test fetch with missing download_url field.""" + import json + self.mock_requests.set_next_response( + status_code=200, + text=json.dumps({"version": "1.0.0", "changelog": "test"}) + ) + + with self.assertRaises(ValueError) as cm: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + + self.assertIn("download_url", str(cm.exception)) + + def test_is_update_available_newer_version(self): + """Test that newer version is detected.""" + result = self.checker.is_update_available("1.2.3", "1.2.2") + + self.assertTrue(result) + + def test_is_update_available_same_version(self): + """Test that same version is not an update.""" + result = self.checker.is_update_available("1.2.3", "1.2.3") + + self.assertFalse(result) + + def test_is_update_available_older_version(self): + """Test that older version is not an update.""" + result = self.checker.is_update_available("1.2.2", "1.2.3") + + self.assertFalse(result) + + def test_fetch_update_info_timeout(self): + """Test fetch with request timeout.""" + self.mock_requests.set_exception(Exception("Timeout")) + + try: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + self.fail("Should have raised an exception for timeout") + except Exception as e: + self.assertIn("Timeout", str(e)) + + def test_fetch_update_info_connection_refused(self): + """Test fetch with connection refused.""" + self.mock_requests.set_exception(Exception("Connection refused")) + + try: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + self.fail("Should have raised an exception") + except Exception as e: + self.assertIn("Connection refused", str(e)) + + def test_fetch_update_info_empty_response(self): + """Test fetch with empty response.""" + self.mock_requests.set_next_response(status_code=200, text='') + + try: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + self.fail("Should have raised an exception for empty response") + except Exception: + pass # Expected to fail + + def test_fetch_update_info_server_error_500(self): + """Test fetch with 500 server error.""" + self.mock_requests.set_next_response(status_code=500) + + try: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + self.fail("Should have raised an exception for HTTP 500") + except Exception as e: + self.assertIn("500", str(e)) + + def test_fetch_update_info_missing_changelog(self): + """Test fetch with missing changelog field.""" + import json + self.mock_requests.set_next_response( + status_code=200, + text=json.dumps({"version": "1.0.0", "download_url": "http://example.com"}) + ) + + try: + self.checker.fetch_update_info("waveshare-esp32-s3-touch-lcd-2") + self.fail("Should have raised exception for missing changelog") + except ValueError as e: + self.assertIn("changelog", str(e)) + + def test_get_update_url_custom_hardware(self): + """Test URL generation for custom hardware IDs.""" + # Test with different hardware IDs + url1 = self.checker.get_update_url("custom-device-v1") + self.assertEqual(url1, "https://updates.micropythonos.com/osupdate_custom-device-v1.json") + + url2 = self.checker.get_update_url("test-123") + self.assertEqual(url2, "https://updates.micropythonos.com/osupdate_test-123.json") + + +class TestUpdateDownloader(unittest.TestCase): + """Test UpdateDownloader class.""" + + def setUp(self): + self.mock_requests = MockRequests() + self.mock_partition = MockPartition + self.downloader = UpdateDownloader( + requests_module=self.mock_requests, + partition_module=self.mock_partition + ) + + def test_download_and_install_success(self): + """Test successful download and install.""" + # Create 8KB of test data (2 blocks of 4096 bytes) + test_data = b'A' * 8192 + self.mock_requests.set_next_response( + status_code=200, + headers={'Content-Length': '8192'}, + content=test_data + ) + + progress_calls = [] + def progress_cb(percent): + progress_calls.append(percent) + + result = self.downloader.download_and_install( + "http://example.com/update.bin", + progress_callback=progress_cb + ) + + self.assertTrue(result['success']) + self.assertEqual(result['bytes_written'], 8192) + self.assertEqual(result['total_size'], 8192) + self.assertIsNone(result['error']) + # MicroPython unittest doesn't have assertGreater + self.assertTrue(len(progress_calls) > 0, "Should have progress callbacks") + + def test_download_and_install_cancelled(self): + """Test cancelled download.""" + test_data = b'A' * 8192 + self.mock_requests.set_next_response( + status_code=200, + headers={'Content-Length': '8192'}, + content=test_data + ) + + call_count = [0] + def should_continue(): + call_count[0] += 1 + return call_count[0] < 2 # Cancel after first chunk + + result = self.downloader.download_and_install( + "http://example.com/update.bin", + should_continue_callback=should_continue + ) + + self.assertFalse(result['success']) + self.assertIn("cancelled", result['error'].lower()) + + def test_download_with_padding(self): + """Test that last chunk is properly padded.""" + # 5000 bytes - not a multiple of 4096 + test_data = b'B' * 5000 + self.mock_requests.set_next_response( + status_code=200, + headers={'Content-Length': '5000'}, + content=test_data + ) + + result = self.downloader.download_and_install( + "http://example.com/update.bin" + ) + + self.assertTrue(result['success']) + # Should be rounded up to 8192 (2 * 4096) + self.assertEqual(result['total_size'], 8192) + + def test_download_with_network_error(self): + """Test download with network error during transfer.""" + self.mock_requests.set_exception(Exception("Network error")) + + result = self.downloader.download_and_install( + "http://example.com/update.bin" + ) + + self.assertFalse(result['success']) + self.assertIsNotNone(result['error']) + self.assertIn("Network error", result['error']) + + def test_download_with_zero_content_length(self): + """Test download with missing or zero Content-Length.""" + test_data = b'C' * 1000 + self.mock_requests.set_next_response( + status_code=200, + headers={}, # No Content-Length header + content=test_data + ) + + result = self.downloader.download_and_install( + "http://example.com/update.bin" + ) + + # Should still work, just with unknown total size initially + self.assertTrue(result['success']) + + def test_download_progress_callback_called(self): + """Test that progress callback is called during download.""" + test_data = b'D' * 8192 + self.mock_requests.set_next_response( + status_code=200, + headers={'Content-Length': '8192'}, + content=test_data + ) + + progress_values = [] + def track_progress(percent): + progress_values.append(percent) + + result = self.downloader.download_and_install( + "http://example.com/update.bin", + progress_callback=track_progress + ) + + self.assertTrue(result['success']) + # Should have at least 2 progress updates (for 2 chunks of 4096) + self.assertTrue(len(progress_values) >= 2) + # Last progress should be 100% + self.assertEqual(progress_values[-1], 100.0) + + def test_download_small_file(self): + """Test downloading a file smaller than one chunk.""" + test_data = b'E' * 100 # Only 100 bytes + self.mock_requests.set_next_response( + status_code=200, + headers={'Content-Length': '100'}, + content=test_data + ) + + result = self.downloader.download_and_install( + "http://example.com/update.bin" + ) + + self.assertTrue(result['success']) + # Should be padded to 4096 + self.assertEqual(result['total_size'], 4096) + self.assertEqual(result['bytes_written'], 4096) + + def test_download_exact_chunk_multiple(self): + """Test downloading exactly 2 chunks (no padding needed).""" + test_data = b'F' * 8192 # Exactly 2 * 4096 + self.mock_requests.set_next_response( + status_code=200, + headers={'Content-Length': '8192'}, + content=test_data + ) + + result = self.downloader.download_and_install( + "http://example.com/update.bin" + ) + + self.assertTrue(result['success']) + self.assertEqual(result['total_size'], 8192) + self.assertEqual(result['bytes_written'], 8192) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_osupdate_graphical.py b/tests/test_osupdate_graphical.py new file mode 100644 index 00000000..9b2147a6 --- /dev/null +++ b/tests/test_osupdate_graphical.py @@ -0,0 +1,329 @@ +import unittest +import lvgl as lv +import mpos +import time +import sys +import os + +# Import graphical test helper +from graphical_test_helper import ( + wait_for_render, + capture_screenshot, + find_label_with_text, + verify_text_present, + print_screen_labels +) + + +class TestOSUpdateGraphicalUI(unittest.TestCase): + """Graphical tests for OSUpdate app UI state.""" + + def setUp(self): + """Set up test fixtures before each test method.""" + self.hardware_id = mpos.info.get_hardware_id() + self.screenshot_dir = "tests/screenshots" + + # Ensure screenshots directory exists + # First check if tests directory exists + try: + os.stat("tests") + except OSError: + # We're not in the right directory, maybe running from root + pass + + # Now create screenshots directory if needed + try: + os.stat(self.screenshot_dir) + except OSError: + try: + os.mkdir(self.screenshot_dir) + except OSError: + # Might already exist or permission issue + pass + + def tearDown(self): + """Clean up after each test method.""" + # Navigate back to launcher + mpos.ui.back_screen() + wait_for_render(5) + + def test_app_launches_successfully(self): + """Test that OSUpdate app launches without errors.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + + self.assertTrue(result, "Failed to start OSUpdate app") + wait_for_render(10) + + # Get active screen + screen = lv.screen_active() + self.assertIsNotNone(screen, "No active screen after launch") + + def test_ui_elements_exist(self): + """Test that all required UI elements are created.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(15) + + screen = lv.screen_active() + + # Find UI elements by searching for labels/text + current_version_label = find_label_with_text(screen, "Installed OS version") + self.assertIsNotNone(current_version_label, "Current version label not found") + + # Check for force update checkbox text (might be "Force" or "Update") + force_checkbox_found = verify_text_present(screen, "Force") or verify_text_present(screen, "force") + self.assertTrue(force_checkbox_found, "Force checkbox text not found") + + # Check for update button text (case insensitive) + update_button_found = verify_text_present(screen, "Update") or verify_text_present(screen, "update") + self.assertTrue(update_button_found, "Update button text not found") + + def test_force_checkbox_initially_unchecked(self): + """Test that force update checkbox starts unchecked.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(15) + + screen = lv.screen_active() + + # Find checkbox - it's the first checkbox on the screen + checkbox = None + def find_checkbox(obj): + nonlocal checkbox + if checkbox: + return + # Check if this object is a checkbox + try: + # In LVGL, checkboxes have specific flags/properties + if obj.get_child_count() >= 0: # It's a valid object + # Try to get state - checkboxes respond to STATE.CHECKED + state = obj.get_state() + # If it has checkbox-like text, it's probably our checkbox + for i in range(obj.get_child_count()): + child = obj.get_child(i) + if hasattr(child, 'get_text'): + text = child.get_text() + if text and "Force Update" in text: + checkbox = obj.get_parent() if obj.get_parent() else obj + return + except: + pass + + # Recursively search children + for i in range(obj.get_child_count()): + child = obj.get_child(i) + find_checkbox(child) + + find_checkbox(screen) + + if checkbox: + state = checkbox.get_state() + is_checked = bool(state & lv.STATE.CHECKED) + self.assertFalse(is_checked, "Force Update checkbox should start unchecked") + + def test_install_button_initially_disabled(self): + """Test that install button starts in disabled state.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(15) + + screen = lv.screen_active() + + # Find the button + button = None + def find_button(obj): + nonlocal button + if button: + return + # Check if this object contains "Update OS" text + for i in range(obj.get_child_count()): + child = obj.get_child(i) + if hasattr(child, 'get_text'): + text = child.get_text() + if text and "Update OS" in text: + # Parent is likely the button + button = obj + return + + # Recursively search children + for i in range(obj.get_child_count()): + child = obj.get_child(i) + find_button(child) + + find_button(screen) + + if button: + state = button.get_state() + is_disabled = bool(state & lv.STATE.DISABLED) + self.assertTrue(is_disabled, "Install button should start disabled") + + def test_current_version_displayed(self): + """Test that current OS version is displayed correctly.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(15) + + screen = lv.screen_active() + + # Find version label + version_label = find_label_with_text(screen, "Installed OS version:") + self.assertIsNotNone(version_label, "Version label not found") + + # Check that it contains the current version + label_text = version_label.get_text() + current_version = mpos.info.CURRENT_OS_VERSION + self.assertIn(current_version, label_text, + f"Current version {current_version} not in label text: {label_text}") + + def test_initial_status_message_without_wifi(self): + """Test status message when wifi is not connected.""" + # This test assumes desktop mode where wifi check returns True + # On actual hardware without wifi, it would show error + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(15) + + screen = lv.screen_active() + + # Should show either "Checking for OS updates..." or update info + # or wifi error depending on network state + checking_found = verify_text_present(screen, "Checking") or \ + verify_text_present(screen, "version") or \ + verify_text_present(screen, "WiFi") + self.assertTrue(checking_found, "Should show some status message") + + def test_screenshot_initial_state(self): + """Capture screenshot of initial app state.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(20) + + screen = lv.screen_active() + + # Print labels for debugging + print("\n=== OSUpdate Initial State Labels ===") + print_screen_labels(screen) + + # Capture screenshot + screenshot_path = f"{self.screenshot_dir}/osupdate_initial_{self.hardware_id}.raw" + capture_screenshot(screenshot_path) + print(f"Screenshot saved to: {screenshot_path}") + + +class TestOSUpdateGraphicalStatusMessages(unittest.TestCase): + """Graphical tests for OSUpdate status messages.""" + + def setUp(self): + """Set up test fixtures.""" + self.hardware_id = mpos.info.get_hardware_id() + self.screenshot_dir = "tests/screenshots" + + try: + os.stat(self.screenshot_dir) + except OSError: + try: + os.mkdir(self.screenshot_dir) + except OSError: + pass + + def tearDown(self): + """Clean up after test.""" + mpos.ui.back_screen() + wait_for_render(5) + + def test_status_label_exists(self): + """Test that status label is created and visible.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(15) + + screen = lv.screen_active() + + # Status label should exist and show some text + # Look for common status messages + has_status = ( + verify_text_present(screen, "Checking") or + verify_text_present(screen, "version") or + verify_text_present(screen, "WiFi") or + verify_text_present(screen, "Error") or + verify_text_present(screen, "Update") + ) + self.assertTrue(has_status, "Status label should be present with some message") + + def test_all_labels_readable(self): + """Test that all labels are readable (no truncation issues).""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(15) + + screen = lv.screen_active() + + # Print all labels to verify they're readable + print("\n=== All OSUpdate Labels ===") + print_screen_labels(screen) + + # At minimum, should have version label + version_found = verify_text_present(screen, "Installed OS version") + self.assertTrue(version_found, "Version label should be present and readable") + + +class TestOSUpdateGraphicalScreenshots(unittest.TestCase): + """Screenshot tests for visual regression testing.""" + + def setUp(self): + """Set up test fixtures.""" + self.hardware_id = mpos.info.get_hardware_id() + self.screenshot_dir = "tests/screenshots" + + try: + os.stat(self.screenshot_dir) + except OSError: + try: + os.mkdir(self.screenshot_dir) + except OSError: + pass + + def tearDown(self): + """Clean up after test.""" + mpos.ui.back_screen() + wait_for_render(5) + + def test_capture_main_screen(self): + """Capture screenshot of main OSUpdate screen.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(20) + + screenshot_path = f"{self.screenshot_dir}/osupdate_main_{self.hardware_id}.raw" + capture_screenshot(screenshot_path) + + # Verify file was created + try: + stat = os.stat(screenshot_path) + self.assertTrue(stat[6] > 0, "Screenshot file should not be empty") + except OSError: + self.fail(f"Screenshot file not created: {screenshot_path}") + + def test_capture_with_labels_visible(self): + """Capture screenshot ensuring all text is visible.""" + result = mpos.apps.start_app("com.micropythonos.osupdate") + self.assertTrue(result) + wait_for_render(20) + + screen = lv.screen_active() + + # Verify key elements are visible before screenshot (case insensitive) + has_version = verify_text_present(screen, "Installed") or verify_text_present(screen, "version") + has_force = verify_text_present(screen, "Force") or verify_text_present(screen, "force") + has_button = verify_text_present(screen, "Update") or verify_text_present(screen, "update") + + self.assertTrue(has_version, "Version label should be visible") + self.assertTrue(has_force, "Force checkbox should be visible") + self.assertTrue(has_button, "Update button should be visible") + + screenshot_path = f"{self.screenshot_dir}/osupdate_labeled_{self.hardware_id}.raw" + capture_screenshot(screenshot_path) + + +if __name__ == '__main__': + unittest.main()