From aa1b358facf15335f2baecef0958b2530f5debec Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Tue, 18 Nov 2025 12:04:00 +0100 Subject: [PATCH] Add unit tests --- CHANGELOG.md | 10 +- tests/network_test_helper.py | 661 +++++++++++++++++++++++++++++ tests/test_connectivity_manager.py | 638 ++++++++++++++++++++++++++++ tests/test_osupdate.py | 107 +---- 4 files changed, 1309 insertions(+), 107 deletions(-) create mode 100644 tests/network_test_helper.py create mode 100644 tests/test_connectivity_manager.py diff --git a/CHANGELOG.md b/CHANGELOG.md index fdff8b95..85f61a1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,15 +1,15 @@ -0.4.0 (unreleased) +0.4.0 ===== - Add custom MposKeyboard with more than 50% bigger buttons, great for tiny touch screens! - Apply theme changes (dark mode, color) immediately after saving - About app: add a bit more info -- Camera app: fix one-in-two "camera image stays blank" issue +- Camera app: fix one-in-two 'camera image stays blank' issue - OSUpdate app: enable scrolling with joystick/arrow keys - OSUpdate app: Major rework with improved reliability and user experience - - add WiFi monitoring - shows "Waiting for WiFi..." instead of error when no connection + - add WiFi monitoring - shows 'Waiting for WiFi...' instead of error when no connection - add automatic pause/resume on WiFi loss during downloads using HTTP Range headers - add user-friendly error messages with specific guidance for each error type - - add "Check Again" button for easy retry after errors + - add 'Check Again' button for easy retry after errors - add state machine for better app state management - add comprehensive test coverage (42 tests: 31 unit tests + 11 graphical tests) - refactor code into testable components (NetworkMonitor, UpdateChecker, UpdateDownloader) @@ -17,7 +17,7 @@ - improve timeout handling (5-minute wait for WiFi with clear messaging) - Tests: add test infrastructure with mock classes for network, HTTP, and partition operations - Tests: add graphical test helper utilities for UI verification and screenshot capture -- API: change "display" to mpos.ui.main_display +- API: change 'display' to mpos.ui.main_display - API: change mpos.ui.th to mpos.ui.task_handler - waveshare-esp32-s3-touch-lcd-2: power off camera at boot to conserve power - waveshare-esp32-s3-touch-lcd-2: increase touch screen input clock frequency from 100kHz to 400kHz diff --git a/tests/network_test_helper.py b/tests/network_test_helper.py new file mode 100644 index 00000000..e3f13d31 --- /dev/null +++ b/tests/network_test_helper.py @@ -0,0 +1,661 @@ +""" +Network testing helper module for MicroPythonOS. + +This module provides mock implementations of network-related modules +for testing without requiring actual network connectivity. These mocks +are designed to be used with dependency injection in the classes being tested. + +Usage: + from network_test_helper import MockNetwork, MockRequests, MockTimer + + # Create mocks + mock_network = MockNetwork(connected=True) + mock_requests = MockRequests() + + # Configure mock responses + mock_requests.set_next_response(status_code=200, text='{"key": "value"}') + + # Pass to class being tested + obj = MyClass(network_module=mock_network, requests_module=mock_requests) + + # Test behavior + result = obj.fetch_data() + assert mock_requests.last_url == "http://expected.url" +""" + +import time + + +class MockNetwork: + """ + Mock network module for testing network connectivity. + + Simulates the MicroPython 'network' module with WLAN interface. + """ + + STA_IF = 0 # Station interface constant + AP_IF = 1 # Access Point interface constant + + class MockWLAN: + """Mock WLAN interface.""" + + def __init__(self, interface, connected=True): + self.interface = interface + self._connected = connected + self._config = {} + + def isconnected(self): + """Return whether the WLAN is connected.""" + return self._connected + + def active(self, is_active=None): + """Get/set whether the interface is active.""" + if is_active is None: + return self._connected + self._active = is_active + + def connect(self, ssid, password): + """Simulate connecting to a network.""" + self._connected = True + self._config['ssid'] = ssid + + def disconnect(self): + """Simulate disconnecting from network.""" + self._connected = False + + def config(self, param): + """Get configuration parameter.""" + return self._config.get(param) + + def ifconfig(self): + """Get IP configuration.""" + if self._connected: + return ('192.168.1.100', '255.255.255.0', '192.168.1.1', '8.8.8.8') + return ('0.0.0.0', '0.0.0.0', '0.0.0.0', '0.0.0.0') + + def __init__(self, connected=True): + """ + Initialize mock network module. + + Args: + connected: Initial connection state (default: True) + """ + self._connected = connected + self._wlan_instances = {} + + def WLAN(self, interface): + """ + Create or return a WLAN interface. + + Args: + interface: Interface type (STA_IF or AP_IF) + + Returns: + MockWLAN instance + """ + if interface not in self._wlan_instances: + self._wlan_instances[interface] = self.MockWLAN(interface, self._connected) + return self._wlan_instances[interface] + + def set_connected(self, connected): + """ + Change the connection state of all WLAN interfaces. + + Args: + connected: New connection state + """ + self._connected = connected + for wlan in self._wlan_instances.values(): + wlan._connected = connected + + +class MockRaw: + """ + Mock raw HTTP response for streaming. + + Simulates the 'raw' attribute of requests.Response for chunked reading. + """ + + def __init__(self, content): + """ + Initialize mock raw response. + + Args: + content: Binary content to stream + """ + self.content = content + self.position = 0 + + def read(self, size): + """ + Read a chunk of data. + + Args: + size: Number of bytes to read + + Returns: + bytes: Chunk of data (may be smaller than size at end of stream) + """ + chunk = self.content[self.position:self.position + size] + self.position += len(chunk) + return chunk + + +class MockResponse: + """ + Mock HTTP response. + + Simulates requests.Response object with status code, text, headers, etc. + """ + + def __init__(self, status_code=200, text='', headers=None, content=b''): + """ + Initialize mock response. + + Args: + status_code: HTTP status code (default: 200) + text: Response text content (default: '') + headers: Response headers dict (default: {}) + content: Binary response content (default: b'') + """ + self.status_code = status_code + self.text = text + self.headers = headers or {} + self.content = content + self._closed = False + + # Mock raw attribute for streaming + self.raw = MockRaw(content) + + def close(self): + """Close the response.""" + self._closed = True + + def json(self): + """Parse response as JSON.""" + import json + return json.loads(self.text) + + +class MockRequests: + """ + Mock requests module for testing HTTP operations. + + Provides configurable mock responses and exception injection for testing + HTTP client code without making actual network requests. + """ + + def __init__(self): + """Initialize mock requests module.""" + self.last_url = None + self.last_headers = None + self.last_timeout = None + self.last_stream = None + self.next_response = None + self.raise_exception = None + self.call_history = [] + + def get(self, url, stream=False, timeout=None, headers=None): + """ + Mock GET request. + + Args: + url: URL to fetch + stream: Whether to stream the response + timeout: Request timeout in seconds + headers: Request headers dict + + Returns: + MockResponse object + + Raises: + Exception: If an exception was configured via set_exception() + """ + self.last_url = url + self.last_headers = headers + self.last_timeout = timeout + self.last_stream = stream + + # Record call in history + self.call_history.append({ + 'method': 'GET', + 'url': url, + 'stream': stream, + 'timeout': timeout, + 'headers': headers + }) + + if self.raise_exception: + exc = self.raise_exception + self.raise_exception = None # Clear after raising + raise exc + + if self.next_response: + response = self.next_response + self.next_response = None # Clear after returning + return response + + # Default response + return MockResponse() + + def post(self, url, data=None, json=None, timeout=None, headers=None): + """ + Mock POST request. + + Args: + url: URL to post to + data: Form data to send + json: JSON data to send + timeout: Request timeout in seconds + headers: Request headers dict + + Returns: + MockResponse object + + Raises: + Exception: If an exception was configured via set_exception() + """ + self.last_url = url + self.last_headers = headers + self.last_timeout = timeout + + # Record call in history + self.call_history.append({ + 'method': 'POST', + 'url': url, + 'data': data, + 'json': json, + 'timeout': timeout, + 'headers': headers + }) + + if self.raise_exception: + exc = self.raise_exception + self.raise_exception = None + raise exc + + if self.next_response: + response = self.next_response + self.next_response = None + return response + + return MockResponse() + + def set_next_response(self, status_code=200, text='', headers=None, content=b''): + """ + Configure the next response to return. + + Args: + status_code: HTTP status code (default: 200) + text: Response text (default: '') + headers: Response headers dict (default: {}) + content: Binary response content (default: b'') + + Returns: + MockResponse: The configured response object + """ + self.next_response = MockResponse(status_code, text, headers, content) + return self.next_response + + def set_exception(self, exception): + """ + Configure an exception to raise on the next request. + + Args: + exception: Exception instance to raise + """ + self.raise_exception = exception + + def clear_history(self): + """Clear the call history.""" + self.call_history = [] + + +class MockJSON: + """ + Mock JSON module for testing JSON parsing. + + Allows injection of parse errors for testing error handling. + """ + + def __init__(self): + """Initialize mock JSON module.""" + self.raise_exception = None + + def loads(self, text): + """ + Parse JSON string. + + Args: + text: JSON string to parse + + Returns: + Parsed JSON object + + Raises: + Exception: If an exception was configured via set_exception() + """ + if self.raise_exception: + exc = self.raise_exception + self.raise_exception = None + raise exc + + # Use Python's real json module for actual parsing + import json + return json.loads(text) + + def dumps(self, obj): + """ + Serialize object to JSON string. + + Args: + obj: Object to serialize + + Returns: + str: JSON string + """ + import json + return json.dumps(obj) + + def set_exception(self, exception): + """ + Configure an exception to raise on the next loads() call. + + Args: + exception: Exception instance to raise + """ + self.raise_exception = exception + + +class MockTimer: + """ + Mock Timer for testing periodic callbacks. + + Simulates machine.Timer without actual delays. Useful for testing + code that uses timers for periodic tasks. + """ + + # Class-level registry of all timers + _all_timers = {} + _next_timer_id = 0 + + PERIODIC = 1 + ONE_SHOT = 0 + + def __init__(self, timer_id): + """ + Initialize mock timer. + + Args: + timer_id: Timer ID (0-3 on most MicroPython platforms) + """ + self.timer_id = timer_id + self.callback = None + self.period = None + self.mode = None + self.active = False + MockTimer._all_timers[timer_id] = self + + def init(self, period=None, mode=None, callback=None): + """ + Initialize/configure the timer. + + Args: + period: Timer period in milliseconds + mode: Timer mode (PERIODIC or ONE_SHOT) + callback: Callback function to call on timer fire + """ + self.period = period + self.mode = mode + self.callback = callback + self.active = True + + def deinit(self): + """Deinitialize the timer.""" + self.active = False + self.callback = None + + def trigger(self, *args, **kwargs): + """ + Manually trigger the timer callback (for testing). + + Args: + *args: Arguments to pass to callback + **kwargs: Keyword arguments to pass to callback + """ + if self.callback and self.active: + self.callback(*args, **kwargs) + + @classmethod + def get_timer(cls, timer_id): + """ + Get a timer by ID. + + Args: + timer_id: Timer ID to retrieve + + Returns: + MockTimer instance or None if not found + """ + return cls._all_timers.get(timer_id) + + @classmethod + def trigger_all(cls): + """Trigger all active timers (for testing).""" + for timer in cls._all_timers.values(): + if timer.active: + timer.trigger() + + @classmethod + def reset_all(cls): + """Reset all timers (clear registry).""" + cls._all_timers.clear() + + +class MockSocket: + """ + Mock socket for testing socket operations. + + Simulates usocket module without actual network I/O. + """ + + AF_INET = 2 + SOCK_STREAM = 1 + + def __init__(self, af=None, sock_type=None): + """ + Initialize mock socket. + + Args: + af: Address family (AF_INET, etc.) + sock_type: Socket type (SOCK_STREAM, etc.) + """ + self.af = af + self.sock_type = sock_type + self.connected = False + self.bound = False + self.listening = False + self.address = None + self.port = None + self._send_exception = None + self._recv_data = b'' + self._recv_position = 0 + + def connect(self, address): + """ + Simulate connecting to an address. + + Args: + address: Tuple of (host, port) + """ + self.connected = True + self.address = address + + def bind(self, address): + """ + Simulate binding to an address. + + Args: + address: Tuple of (host, port) + """ + self.bound = True + self.address = address + + def listen(self, backlog): + """ + Simulate listening for connections. + + Args: + backlog: Maximum number of queued connections + """ + self.listening = True + + def send(self, data): + """ + Simulate sending data. + + Args: + data: Bytes to send + + Returns: + int: Number of bytes sent + + Raises: + Exception: If configured via set_send_exception() + """ + if self._send_exception: + exc = self._send_exception + self._send_exception = None + raise exc + return len(data) + + def recv(self, size): + """ + Simulate receiving data. + + Args: + size: Maximum bytes to receive + + Returns: + bytes: Received data + """ + chunk = self._recv_data[self._recv_position:self._recv_position + size] + self._recv_position += len(chunk) + return chunk + + def close(self): + """Close the socket.""" + self.connected = False + + def set_send_exception(self, exception): + """ + Configure an exception to raise on next send(). + + Args: + exception: Exception instance to raise + """ + self._send_exception = exception + + def set_recv_data(self, data): + """ + Configure data to return from recv(). + + Args: + data: Bytes to return from recv() calls + """ + self._recv_data = data + self._recv_position = 0 + + +def socket(af=MockSocket.AF_INET, sock_type=MockSocket.SOCK_STREAM): + """ + Create a mock socket. + + Args: + af: Address family (default: AF_INET) + sock_type: Socket type (default: SOCK_STREAM) + + Returns: + MockSocket instance + """ + return MockSocket(af, sock_type) + + +class MockTime: + """ + Mock time module for testing time-dependent code. + + Allows manual control of time progression for deterministic testing. + """ + + def __init__(self, start_time=0): + """ + Initialize mock time module. + + Args: + start_time: Initial time in milliseconds (default: 0) + """ + self._current_time_ms = start_time + self._sleep_calls = [] + + def ticks_ms(self): + """ + Get current time in milliseconds. + + Returns: + int: Current time in milliseconds + """ + return self._current_time_ms + + def ticks_diff(self, ticks1, ticks2): + """ + Calculate difference between two tick values. + + Args: + ticks1: End time + ticks2: Start time + + Returns: + int: Difference in milliseconds + """ + return ticks1 - ticks2 + + def sleep(self, seconds): + """ + Simulate sleep (doesn't actually sleep). + + Args: + seconds: Number of seconds to sleep + """ + self._sleep_calls.append(seconds) + + def sleep_ms(self, milliseconds): + """ + Simulate sleep in milliseconds. + + Args: + milliseconds: Number of milliseconds to sleep + """ + self._sleep_calls.append(milliseconds / 1000.0) + + def advance(self, milliseconds): + """ + Advance the mock time. + + Args: + milliseconds: Number of milliseconds to advance + """ + self._current_time_ms += milliseconds + + def get_sleep_calls(self): + """ + Get history of sleep calls. + + Returns: + list: List of sleep durations in seconds + """ + return self._sleep_calls + + def clear_sleep_calls(self): + """Clear the sleep call history.""" + self._sleep_calls = [] diff --git a/tests/test_connectivity_manager.py b/tests/test_connectivity_manager.py new file mode 100644 index 00000000..edb854db --- /dev/null +++ b/tests/test_connectivity_manager.py @@ -0,0 +1,638 @@ +import unittest +import sys + +# Add parent directory to path so we can import network_test_helper +# When running from unittest.sh, we're in internal_filesystem/, so tests/ is ../tests/ +sys.path.insert(0, '../tests') + +# Import our network test helpers +from network_test_helper import MockNetwork, MockTimer, MockTime, MockRequests, MockSocket + +# Mock machine module with Timer +class MockMachine: + """Mock machine module.""" + Timer = MockTimer + +# Mock usocket module +class MockUsocket: + """Mock usocket module.""" + AF_INET = MockSocket.AF_INET + SOCK_STREAM = MockSocket.SOCK_STREAM + + @staticmethod + def socket(af, sock_type): + return MockSocket(af, sock_type) + +# Inject mocks into sys.modules BEFORE importing connectivity_manager +sys.modules['machine'] = MockMachine +sys.modules['usocket'] = MockUsocket + +# Mock requests module +mock_requests = MockRequests() +sys.modules['requests'] = mock_requests + + +class TestConnectivityManagerWithNetwork(unittest.TestCase): + """Test ConnectivityManager with network module available.""" + + def setUp(self): + """Set up test fixtures.""" + # Create a mock network module + self.mock_network = MockNetwork(connected=True) + + # Mock the network module globally BEFORE importing + sys.modules['network'] = self.mock_network + + # Now import after network is mocked + # Need to reload the module to pick up the new network module + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + # Import fresh + from mpos.net.connectivity_manager import ConnectivityManager + self.ConnectivityManager = ConnectivityManager + + # Reset the singleton instance + ConnectivityManager._instance = None + + # Reset all mock timers + MockTimer.reset_all() + + def tearDown(self): + """Clean up after test.""" + # Reset singleton + if hasattr(self, 'ConnectivityManager'): + self.ConnectivityManager._instance = None + + # Clean up mocks + if 'network' in sys.modules: + del sys.modules['network'] + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + MockTimer.reset_all() + + def test_singleton_pattern(self): + """Test that ConnectivityManager is a singleton via get().""" + # Using get() should return the same instance + cm1 = self.ConnectivityManager.get() + cm2 = self.ConnectivityManager.get() + cm3 = self.ConnectivityManager.get() + + # All should be the same instance + self.assertEqual(id(cm1), id(cm2)) + self.assertEqual(id(cm2), id(cm3)) + + def test_initialization_with_network_module(self): + """Test initialization when network module is available.""" + cm = self.ConnectivityManager() + + # Should have network checking capability + self.assertTrue(cm.can_check_network) + + # Should have created WLAN instance + self.assertIsNotNone(cm.wlan) + + # Should have created timer + timer = MockTimer.get_timer(1) + self.assertIsNotNone(timer) + self.assertTrue(timer.active) + self.assertEqual(timer.period, 8000) + self.assertEqual(timer.mode, MockTimer.PERIODIC) + + def test_initial_connection_state_when_connected(self): + """Test initial state when network is connected.""" + self.mock_network.set_connected(True) + cm = self.ConnectivityManager() + + # Should detect connection during initialization + self.assertTrue(cm.is_online()) + + def test_initial_connection_state_when_disconnected(self): + """Test initial state when network is disconnected.""" + self.mock_network.set_connected(False) + cm = self.ConnectivityManager() + + # Should detect disconnection during initialization + self.assertFalse(cm.is_online()) + + def test_callback_registration(self): + """Test registering callbacks.""" + cm = self.ConnectivityManager() + + callback_called = [] + def my_callback(online): + callback_called.append(online) + + cm.register_callback(my_callback) + + # Callback should be in the list + self.assertTrue(my_callback in cm.callbacks) + + # Registering again should not duplicate + cm.register_callback(my_callback) + self.assertEqual(cm.callbacks.count(my_callback), 1) + + def test_callback_unregistration(self): + """Test unregistering callbacks.""" + cm = self.ConnectivityManager() + + def callback1(online): + pass + + def callback2(online): + pass + + cm.register_callback(callback1) + cm.register_callback(callback2) + + # Both should be registered + self.assertTrue(callback1 in cm.callbacks) + self.assertTrue(callback2 in cm.callbacks) + + # Unregister callback1 + cm.unregister_callback(callback1) + + # Only callback2 should remain + self.assertFalse(callback1 in cm.callbacks) + self.assertTrue(callback2 in cm.callbacks) + + def test_callback_notification_on_state_change(self): + """Test that callbacks are notified when state changes.""" + self.mock_network.set_connected(True) + cm = self.ConnectivityManager() + + notifications = [] + def my_callback(online): + notifications.append(online) + + cm.register_callback(my_callback) + + # Simulate going offline + self.mock_network.set_connected(False) + + # Trigger periodic check (timer passes itself as first arg) + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # Should have been notified of offline state + self.assertEqual(len(notifications), 1) + self.assertFalse(notifications[0]) + + # Simulate going back online + self.mock_network.set_connected(True) + timer.callback(timer) + + # Should have been notified of online state + self.assertEqual(len(notifications), 2) + self.assertTrue(notifications[1]) + + def test_callback_notification_not_sent_when_state_unchanged(self): + """Test that callbacks are not notified when state doesn't change.""" + self.mock_network.set_connected(True) + cm = self.ConnectivityManager() + + notifications = [] + def my_callback(online): + notifications.append(online) + + cm.register_callback(my_callback) + + # Trigger periodic check while still connected + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # Should not have been notified (state didn't change) + self.assertEqual(len(notifications), 0) + + def test_periodic_check_detects_connection_change(self): + """Test that periodic check detects connection state changes.""" + self.mock_network.set_connected(True) + cm = self.ConnectivityManager() + + # Should be online initially + self.assertTrue(cm.is_online()) + + # Simulate disconnection + self.mock_network.set_connected(False) + + # Trigger periodic check + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # Should now be offline + self.assertFalse(cm.is_online()) + + # Reconnect + self.mock_network.set_connected(True) + timer.callback(timer) + + # Should be online again + self.assertTrue(cm.is_online()) + + def test_callback_exception_handling(self): + """Test that exceptions in callbacks don't break the manager.""" + cm = self.ConnectivityManager() + + notifications = [] + + def bad_callback(online): + raise Exception("Callback error!") + + def good_callback(online): + notifications.append(online) + + cm.register_callback(bad_callback) + cm.register_callback(good_callback) + + # Trigger state change + self.mock_network.set_connected(False) + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # Good callback should still have been called despite bad callback + self.assertEqual(len(notifications), 1) + self.assertFalse(notifications[0]) + + def test_multiple_callbacks(self): + """Test multiple callbacks are all notified.""" + cm = self.ConnectivityManager() + + notifications1 = [] + notifications2 = [] + notifications3 = [] + + cm.register_callback(lambda online: notifications1.append(online)) + cm.register_callback(lambda online: notifications2.append(online)) + cm.register_callback(lambda online: notifications3.append(online)) + + # Trigger state change + self.mock_network.set_connected(False) + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # All callbacks should have been notified + self.assertEqual(len(notifications1), 1) + self.assertEqual(len(notifications2), 1) + self.assertEqual(len(notifications3), 1) + + self.assertFalse(notifications1[0]) + self.assertFalse(notifications2[0]) + self.assertFalse(notifications3[0]) + + def test_is_wifi_connected(self): + """Test is_wifi_connected() method.""" + cm = self.ConnectivityManager() + + # is_connected is set to False during init for platforms with network module + # It's only set to True for platforms without network module (desktop) + self.assertFalse(cm.is_wifi_connected()) + + +class TestConnectivityManagerWithoutNetwork(unittest.TestCase): + """Test ConnectivityManager without network module (desktop mode).""" + + def setUp(self): + """Set up test fixtures.""" + # Remove network module to simulate desktop environment + if 'network' in sys.modules: + del sys.modules['network'] + + # Reload the module without network + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + from mpos.net.connectivity_manager import ConnectivityManager + self.ConnectivityManager = ConnectivityManager + + # Reset the singleton instance + ConnectivityManager._instance = None + + # Reset timers + MockTimer.reset_all() + + def tearDown(self): + """Clean up after test.""" + if hasattr(self, 'ConnectivityManager'): + self.ConnectivityManager._instance = None + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + MockTimer.reset_all() + + def test_initialization_without_network_module(self): + """Test initialization when network module is not available.""" + cm = self.ConnectivityManager() + + # Should NOT have network checking capability + self.assertFalse(cm.can_check_network) + + # Should not have WLAN instance + self.assertIsNone(cm.wlan) + + # Should still create timer + timer = MockTimer.get_timer(1) + self.assertIsNotNone(timer) + + def test_always_online_without_network_module(self): + """Test that manager assumes always online without network module.""" + cm = self.ConnectivityManager() + + # Should assume connected + self.assertTrue(cm.is_connected) + + # Should assume online + self.assertTrue(cm.is_online()) + + def test_periodic_check_without_network_module(self): + """Test periodic check when there's no network module.""" + cm = self.ConnectivityManager() + + # Trigger periodic check + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # Should still be online + self.assertTrue(cm.is_online()) + + def test_callbacks_not_triggered_without_network(self): + """Test that callbacks aren't triggered when always online.""" + cm = self.ConnectivityManager() + + notifications = [] + cm.register_callback(lambda online: notifications.append(online)) + + # Trigger periodic checks + timer = MockTimer.get_timer(1) + for _ in range(5): + timer.callback(timer) + + # No notifications should have been sent (state never changed) + self.assertEqual(len(notifications), 0) + + +class TestConnectivityManagerWaitUntilOnline(unittest.TestCase): + """Test wait_until_online functionality.""" + + def setUp(self): + """Set up test fixtures.""" + # Create mock network + self.mock_network = MockNetwork(connected=False) + sys.modules['network'] = self.mock_network + + # Reload module + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + from mpos.net.connectivity_manager import ConnectivityManager + self.ConnectivityManager = ConnectivityManager + + ConnectivityManager._instance = None + MockTimer.reset_all() + + def tearDown(self): + """Clean up after test.""" + if hasattr(self, 'ConnectivityManager'): + self.ConnectivityManager._instance = None + MockTimer.reset_all() + if 'network' in sys.modules: + del sys.modules['network'] + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + def test_wait_until_online_already_online(self): + """Test wait_until_online when already online.""" + self.mock_network.set_connected(True) + cm = self.ConnectivityManager() + + # Should return immediately + result = cm.wait_until_online(timeout=5) + self.assertTrue(result) + + def test_wait_until_online_without_network_module(self): + """Test wait_until_online without network module (desktop).""" + # Remove network module + if 'network' in sys.modules: + del sys.modules['network'] + + # Reload module + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + from mpos.net.connectivity_manager import ConnectivityManager + self.ConnectivityManager = ConnectivityManager + ConnectivityManager._instance = None + + cm = self.ConnectivityManager() + + # Should return True immediately (always online) + result = cm.wait_until_online(timeout=5) + self.assertTrue(result) + + +class TestConnectivityManagerEdgeCases(unittest.TestCase): + """Test edge cases and error conditions.""" + + def setUp(self): + """Set up test fixtures.""" + self.mock_network = MockNetwork(connected=True) + sys.modules['network'] = self.mock_network + + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + from mpos.net.connectivity_manager import ConnectivityManager + self.ConnectivityManager = ConnectivityManager + + ConnectivityManager._instance = None + MockTimer.reset_all() + + def tearDown(self): + """Clean up after test.""" + if hasattr(self, 'ConnectivityManager'): + self.ConnectivityManager._instance = None + MockTimer.reset_all() + if 'network' in sys.modules: + del sys.modules['network'] + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + def test_initialization_creates_timer(self): + """Test that initialization creates periodic timer.""" + cm = self.ConnectivityManager() + + # Timer should exist + timer = MockTimer.get_timer(1) + self.assertIsNotNone(timer) + + # Timer should be configured correctly + self.assertEqual(timer.period, 8000) # 8 seconds + self.assertEqual(timer.mode, MockTimer.PERIODIC) + self.assertTrue(timer.active) + + def test_get_creates_instance_if_not_exists(self): + """Test that get() creates instance if it doesn't exist.""" + # Ensure no instance exists + self.assertIsNone(self.ConnectivityManager._instance) + + # get() should create one + cm = self.ConnectivityManager.get() + self.assertIsNotNone(cm) + + # Subsequent get() should return same instance + cm2 = self.ConnectivityManager.get() + self.assertEqual(id(cm), id(cm2)) + + def test_periodic_check_does_not_notify_on_init(self): + """Test periodic check doesn't notify during initialization.""" + self.mock_network.set_connected(False) + + # Register callback AFTER creating instance to observe later notifications + cm = self.ConnectivityManager() + + notifications = [] + cm.register_callback(lambda online: notifications.append(online)) + + # No notifications yet (initial check had notify=False) + self.assertEqual(len(notifications), 0) + + def test_unregister_nonexistent_callback(self): + """Test unregistering a callback that was never registered.""" + cm = self.ConnectivityManager() + + def my_callback(online): + pass + + # Should not raise an exception + cm.unregister_callback(my_callback) + + # Callbacks should be empty + self.assertEqual(len(cm.callbacks), 0) + + def test_online_offline_online_transitions(self): + """Test multiple state transitions.""" + self.mock_network.set_connected(True) + cm = self.ConnectivityManager() + + notifications = [] + cm.register_callback(lambda online: notifications.append(online)) + + timer = MockTimer.get_timer(1) + + # Go offline + self.mock_network.set_connected(False) + timer.callback(timer) + self.assertFalse(cm.is_online()) + self.assertEqual(notifications[-1], False) + + # Go online + self.mock_network.set_connected(True) + timer.callback(timer) + self.assertTrue(cm.is_online()) + self.assertEqual(notifications[-1], True) + + # Go offline again + self.mock_network.set_connected(False) + timer.callback(timer) + self.assertFalse(cm.is_online()) + self.assertEqual(notifications[-1], False) + + # Should have 3 notifications + self.assertEqual(len(notifications), 3) + + +class TestConnectivityManagerIntegration(unittest.TestCase): + """Integration tests for ConnectivityManager.""" + + def setUp(self): + """Set up test fixtures.""" + self.mock_network = MockNetwork(connected=True) + sys.modules['network'] = self.mock_network + + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + from mpos.net.connectivity_manager import ConnectivityManager + self.ConnectivityManager = ConnectivityManager + + ConnectivityManager._instance = None + MockTimer.reset_all() + + def tearDown(self): + """Clean up after test.""" + if hasattr(self, 'ConnectivityManager'): + self.ConnectivityManager._instance = None + MockTimer.reset_all() + if 'network' in sys.modules: + del sys.modules['network'] + if 'mpos.net.connectivity_manager' in sys.modules: + del sys.modules['mpos.net.connectivity_manager'] + + def test_realistic_usage_scenario(self): + """Test a realistic usage scenario.""" + # App starts, creates connectivity manager + cm = self.ConnectivityManager.get() + + # App registers callback to update UI + ui_state = {'online': True} + def update_ui(online): + ui_state['online'] = online + + cm.register_callback(update_ui) + + # Initially online + self.assertTrue(cm.is_online()) + self.assertTrue(ui_state['online']) + + # User moves out of WiFi range + self.mock_network.set_connected(False) + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # UI should reflect offline state + self.assertFalse(cm.is_online()) + self.assertFalse(ui_state['online']) + + # User returns to WiFi range + self.mock_network.set_connected(True) + timer.callback(timer) + + # UI should reflect online state + self.assertTrue(cm.is_online()) + self.assertTrue(ui_state['online']) + + # App closes, unregisters callback + cm.unregister_callback(update_ui) + + # Callback should be removed + self.assertFalse(update_ui in cm.callbacks) + + def test_multiple_apps_using_connectivity_manager(self): + """Test multiple apps/components using the same manager.""" + cm = self.ConnectivityManager.get() + + # Three different apps register callbacks + app1_state = [] + app2_state = [] + app3_state = [] + + cm.register_callback(lambda online: app1_state.append(online)) + cm.register_callback(lambda online: app2_state.append(online)) + cm.register_callback(lambda online: app3_state.append(online)) + + # Network goes offline + self.mock_network.set_connected(False) + timer = MockTimer.get_timer(1) + timer.callback(timer) + + # All apps should be notified + self.assertEqual(len(app1_state), 1) + self.assertEqual(len(app2_state), 1) + self.assertEqual(len(app3_state), 1) + + # All should see offline state + self.assertFalse(app1_state[0]) + self.assertFalse(app2_state[0]) + self.assertFalse(app3_state[0]) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_osupdate.py b/tests/test_osupdate.py index e8b36c8b..88400c5f 100644 --- a/tests/test_osupdate.py +++ b/tests/test_osupdate.py @@ -1,109 +1,12 @@ import unittest import sys -# Mock classes for testing -class MockNetwork: - """Mock network module for testing NetworkMonitor.""" +# Add parent directory to path so we can import network_test_helper +# When running from unittest.sh, we're in internal_filesystem/, so tests/ is ../tests/ +sys.path.insert(0, '../tests') - STA_IF = 0 # Station interface constant - - class MockWLAN: - def __init__(self, interface): - self.interface = interface - self._connected = True # Default to connected - - def isconnected(self): - return self._connected - - def __init__(self, connected=True): - self._connected = connected - - def WLAN(self, interface): - wlan = self.MockWLAN(interface) - wlan._connected = self._connected - return wlan - - def set_connected(self, connected): - """Helper to change connection state.""" - self._connected = connected - - -class MockRaw: - """Mock raw response for streaming.""" - def __init__(self, content): - self.content = content - self.position = 0 - - def read(self, size): - chunk = self.content[self.position:self.position + size] - self.position += len(chunk) - return chunk - - -class MockResponse: - """Mock HTTP response.""" - def __init__(self, status_code=200, text='', headers=None, content=b''): - self.status_code = status_code - self.text = text - self.headers = headers or {} - self.content = content - self._closed = False - - # Mock raw attribute for streaming - self.raw = MockRaw(content) - - def close(self): - self._closed = True - - -class MockRequests: - """Mock requests module for testing UpdateChecker and UpdateDownloader.""" - - def __init__(self): - self.last_url = None - self.next_response = None - self.raise_exception = None - - def get(self, url, stream=False, timeout=None, headers=None): - self.last_url = url - - if self.raise_exception: - raise self.raise_exception - - if self.next_response: - return self.next_response - - # Default response - return MockResponse() - - def set_next_response(self, status_code=200, text='', headers=None, content=b''): - """Helper to set what the next get() should return.""" - self.next_response = MockResponse(status_code, text, headers, content) - return self.next_response - - def set_exception(self, exception): - """Helper to make next get() raise an exception.""" - self.raise_exception = exception - - -class MockJSON: - """Mock JSON module for testing UpdateChecker.""" - - def __init__(self): - self.raise_exception = None - - def loads(self, text): - if self.raise_exception: - raise self.raise_exception - - # Very simple JSON parser for testing - # In real tests, we can just use Python's json module - import json - return json.loads(text) - - def set_exception(self, exception): - """Helper to make loads() raise an exception.""" - self.raise_exception = exception +# Import network test helpers +from network_test_helper import MockNetwork, MockRequests, MockJSON class MockPartition: