From df486a5a5d42bad327e5f6499137d16e7635d4a8 Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Mon, 24 Nov 2025 08:24:47 +0100 Subject: [PATCH] WifiService: connect to strongest networks first --- .gitignore | 5 + .../lib/mpos/net/wifi_service.py | 8 +- tests/test_wifi_service.py | 256 ++++++++++++++++++ tests/unittest.sh | 152 ----------- 4 files changed, 268 insertions(+), 153 deletions(-) delete mode 100755 tests/unittest.sh diff --git a/.gitignore b/.gitignore index e9462990..5e87af82 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,8 @@ internal_filesystem/data internal_filesystem/sdcard internal_filesystem/tests +internal_filesystem_excluded/ + # these tests contain actual NWC URLs: tests/manual_test_nwcwallet_alby.py tests/manual_test_nwcwallet_cashu.py @@ -21,3 +23,6 @@ __pycache__/ *$py.class *.so .Python + +# these get created: +c_mpos/c_mpos diff --git a/internal_filesystem/lib/mpos/net/wifi_service.py b/internal_filesystem/lib/mpos/net/wifi_service.py index c41a4bd3..bfa76188 100644 --- a/internal_filesystem/lib/mpos/net/wifi_service.py +++ b/internal_filesystem/lib/mpos/net/wifi_service.py @@ -46,6 +46,7 @@ class WifiService: def connect(network_module=None): """ Scan for available networks and connect to the first saved network found. + Networks are tried in order of signal strength (strongest first). Args: network_module: Network module for dependency injection (testing) @@ -63,9 +64,14 @@ class WifiService: # Scan for available networks networks = wlan.scan() + # Sort networks by RSSI (signal strength) in descending order + # RSSI is at index 3, higher values (less negative) = stronger signal + networks = sorted(networks, key=lambda n: n[3], reverse=True) + for n in networks: ssid = n[0].decode() - print(f"WifiService: Found network '{ssid}'") + rssi = n[3] + print(f"WifiService: Found network '{ssid}' (RSSI: {rssi} dBm)") if ssid in WifiService.access_points: password = WifiService.access_points.get(ssid).get("password") diff --git a/tests/test_wifi_service.py b/tests/test_wifi_service.py index 6583b4a7..705b85d4 100644 --- a/tests/test_wifi_service.py +++ b/tests/test_wifi_service.py @@ -455,3 +455,259 @@ class TestWifiServiceDisconnect(unittest.TestCase): WifiService.disconnect(network_module=None) +class TestWifiServiceRSSISorting(unittest.TestCase): + """Test RSSI-based network prioritization.""" + + def setUp(self): + """Set up test fixtures.""" + MockSharedPreferences.reset_all() + WifiService.access_points = {} + WifiService.wifi_busy = False + + def tearDown(self): + """Clean up after tests.""" + WifiService.access_points = {} + WifiService.wifi_busy = False + MockSharedPreferences.reset_all() + + def test_networks_sorted_by_rssi_strongest_first(self): + """Test that networks are sorted by RSSI with strongest first.""" + # Create mock networks with different RSSI values + # Format: (ssid, bssid, channel, rssi, security, hidden) + mock_network = MockNetwork(connected=False) + mock_wlan = mock_network.WLAN(mock_network.STA_IF) + + # Unsorted networks (weak, strong, medium) + mock_wlan._scan_results = [ + (b'WeakNetwork', b'\xaa\xbb\xcc\xdd\xee\xff', 6, -85, 3, False), + (b'StrongNetwork', b'\x11\x22\x33\x44\x55\x66', 1, -45, 3, False), + (b'MediumNetwork', b'\x77\x88\x99\xaa\xbb\xcc', 11, -65, 3, False), + ] + + # Configure all as saved networks + WifiService.access_points = { + 'WeakNetwork': {'password': 'weak123'}, + 'StrongNetwork': {'password': 'strong123'}, + 'MediumNetwork': {'password': 'medium123'} + } + + # Track connection attempts + connection_attempts = [] + + def mock_connect(ssid, password): + connection_attempts.append(ssid) + # Succeed on first attempt + mock_wlan._connected = True + + mock_wlan.connect = mock_connect + + result = WifiService.connect(network_module=mock_network) + + self.assertTrue(result) + # Should try strongest first (-45 dBm) + self.assertEqual(connection_attempts[0], 'StrongNetwork') + # Should only try one (first succeeds) + self.assertEqual(len(connection_attempts), 1) + + def test_multiple_networks_tried_in_rssi_order(self): + """Test that multiple networks are tried in RSSI order when first fails.""" + mock_network = MockNetwork(connected=False) + mock_wlan = mock_network.WLAN(mock_network.STA_IF) + + # Three networks with different signal strengths + mock_wlan._scan_results = [ + (b'BadNetwork1', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -40, 3, False), + (b'BadNetwork2', b'\x11\x22\x33\x44\x55\x66', 6, -50, 3, False), + (b'GoodNetwork', b'\x77\x88\x99\xaa\xbb\xcc', 11, -60, 3, False), + ] + + WifiService.access_points = { + 'BadNetwork1': {'password': 'pass1'}, + 'BadNetwork2': {'password': 'pass2'}, + 'GoodNetwork': {'password': 'pass3'} + } + + # Track attempts and make first two fail + connection_attempts = [] + + def mock_connect(ssid, password): + connection_attempts.append(ssid) + # Only succeed on third attempt + if len(connection_attempts) >= 3: + mock_wlan._connected = True + + mock_wlan.connect = mock_connect + + result = WifiService.connect(network_module=mock_network) + + self.assertTrue(result) + # Verify order: strongest to weakest + self.assertEqual(connection_attempts[0], 'BadNetwork1') # RSSI -40 + self.assertEqual(connection_attempts[1], 'BadNetwork2') # RSSI -50 + self.assertEqual(connection_attempts[2], 'GoodNetwork') # RSSI -60 + self.assertEqual(len(connection_attempts), 3) + + def test_duplicate_ssid_strongest_tried_first(self): + """Test that with duplicate SSIDs, strongest signal is tried first.""" + mock_network = MockNetwork(connected=False) + mock_wlan = mock_network.WLAN(mock_network.STA_IF) + + # Real-world scenario: Multiple APs with same SSID + mock_wlan._scan_results = [ + (b'MyNetwork', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -70, 3, False), + (b'MyNetwork', b'\x11\x22\x33\x44\x55\x66', 6, -50, 3, False), # Strongest + (b'MyNetwork', b'\x77\x88\x99\xaa\xbb\xcc', 11, -85, 3, False), + ] + + WifiService.access_points = { + 'MyNetwork': {'password': 'mypass123'} + } + + connection_attempts = [] + + def mock_connect(ssid, password): + connection_attempts.append(ssid) + # Succeed on first + mock_wlan._connected = True + + mock_wlan.connect = mock_connect + + result = WifiService.connect(network_module=mock_network) + + self.assertTrue(result) + # Should only try once (first is strongest and succeeds) + self.assertEqual(len(connection_attempts), 1) + self.assertEqual(connection_attempts[0], 'MyNetwork') + + def test_rssi_order_with_real_scan_data(self): + """Test with real scan data from actual ESP32 device.""" + mock_network = MockNetwork(connected=False) + mock_wlan = mock_network.WLAN(mock_network.STA_IF) + + # Real scan output from user's example + mock_wlan._scan_results = [ + (b'Channel 8', b'\xde\xec^\x8f\x00A', 11, -47, 3, False), + (b'Baptistus', b'\xd8\xec^\x8f\x00A', 11, -48, 7, False), + (b'telenet-BD74DC9', b'TgQ>t\xe7', 11, -70, 3, False), + (b'Galaxy S10+64bf', b'b\x19\xdf\xef\xb0\x8f', 11, -83, 3, False), + (b'Najeeb\xe2\x80\x99s iPhone', b"F\x07'\xb8\x0b0", 6, -84, 7, False), + (b'DIRECT-83-HP OfficeJet Pro 7740', b'\x1a`$dk\x83', 1, -87, 3, False), + (b'Channel 8', b'\xde\xec^\xe1#w', 1, -91, 3, False), + (b'Baptistus', b'\xd8\xec^\xe1#w', 1, -91, 7, False), + (b'Proximus-Home-596457', b'\xf4\x05\x95\xf9A\xf1', 1, -93, 3, False), + (b'Proximus-Home-596457', b'\xcc\x00\xf1j}\x94', 1, -93, 3, False), + (b'BASE-9104320', b'4,\xc4\xe7\x01\xb7', 1, -94, 3, False), + ] + + # Save several networks + WifiService.access_points = { + 'Channel 8': {'password': 'pass1'}, + 'Baptistus': {'password': 'pass2'}, + 'telenet-BD74DC9': {'password': 'pass3'}, + 'Galaxy S10+64bf': {'password': 'pass4'}, + } + + # Track attempts and fail first to see ordering + connection_attempts = [] + + def mock_connect(ssid, password): + connection_attempts.append(ssid) + # Succeed on second attempt + if len(connection_attempts) >= 2: + mock_wlan._connected = True + + mock_wlan.connect = mock_connect + + result = WifiService.connect(network_module=mock_network) + + self.assertTrue(result) + # Expected order: Channel 8 (-47), Baptistus (-48), telenet (-70), Galaxy (-83) + self.assertEqual(connection_attempts[0], 'Channel 8') + self.assertEqual(connection_attempts[1], 'Baptistus') + self.assertEqual(len(connection_attempts), 2) + + def test_sorting_preserves_network_data_integrity(self): + """Test that sorting doesn't corrupt or lose network data.""" + mock_network = MockNetwork(connected=False) + mock_wlan = mock_network.WLAN(mock_network.STA_IF) + + # Networks with various attributes + mock_wlan._scan_results = [ + (b'Net3', b'\xaa\xaa\xaa\xaa\xaa\xaa', 11, -90, 3, False), + (b'Net1', b'\xbb\xbb\xbb\xbb\xbb\xbb', 1, -40, 7, True), # Hidden + (b'Net2', b'\xcc\xcc\xcc\xcc\xcc\xcc', 6, -60, 2, False), + ] + + WifiService.access_points = { + 'Net1': {'password': 'p1'}, + 'Net2': {'password': 'p2'}, + 'Net3': {'password': 'p3'} + } + + # Track attempts to verify all are tried + connection_attempts = [] + + def mock_connect(ssid, password): + connection_attempts.append(ssid) + # Never succeed, try all + pass + + mock_wlan.connect = mock_connect + + result = WifiService.connect(network_module=mock_network) + + self.assertFalse(result) # No connection succeeded + # Verify all 3 were attempted in RSSI order + self.assertEqual(len(connection_attempts), 3) + self.assertEqual(connection_attempts[0], 'Net1') # RSSI -40 + self.assertEqual(connection_attempts[1], 'Net2') # RSSI -60 + self.assertEqual(connection_attempts[2], 'Net3') # RSSI -90 + + def test_no_saved_networks_in_scan(self): + """Test behavior when scan finds no saved networks.""" + mock_network = MockNetwork(connected=False) + mock_wlan = mock_network.WLAN(mock_network.STA_IF) + + mock_wlan._scan_results = [ + (b'UnknownNet1', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -50, 3, False), + (b'UnknownNet2', b'\x11\x22\x33\x44\x55\x66', 6, -60, 3, False), + ] + + WifiService.access_points = { + 'SavedNetwork': {'password': 'pass123'} + } + + connection_attempts = [] + + def mock_connect(ssid, password): + connection_attempts.append(ssid) + + mock_wlan.connect = mock_connect + + result = WifiService.connect(network_module=mock_network) + + self.assertFalse(result) + # No attempts should be made + self.assertEqual(len(connection_attempts), 0) + + def test_rssi_logging_shows_signal_strength(self): + """Test that RSSI value is logged during scan (for debugging).""" + # This is more of a documentation test to verify the log format + mock_network = MockNetwork(connected=False) + mock_wlan = mock_network.WLAN(mock_network.STA_IF) + + mock_wlan._scan_results = [ + (b'TestNet', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -55, 3, False), + ] + + WifiService.access_points = { + 'TestNet': {'password': 'pass'} + } + + # The connect method now logs "Found network 'TestNet' (RSSI: -55 dBm)" + # This test just verifies it doesn't crash + result = WifiService.connect(network_module=mock_network) + # Since mock doesn't actually connect, this will likely be False + # but the important part is the code runs without error + + diff --git a/tests/unittest.sh b/tests/unittest.sh deleted file mode 100755 index f93cc111..00000000 --- a/tests/unittest.sh +++ /dev/null @@ -1,152 +0,0 @@ -#!/bin/bash - -mydir=$(readlink -f "$0") -mydir=$(dirname "$mydir") -testdir="$mydir" -scriptdir=$(readlink -f "$mydir"/../scripts/) -fs="$mydir"/../internal_filesystem/ -mpremote="$mydir"/../lvgl_micropython/lib/micropython/tools/mpremote/mpremote.py - -# Parse arguments -ondevice="" -onetest="" - -while [ $# -gt 0 ]; do - case "$1" in - --ondevice) - ondevice="yes" - ;; - *) - onetest="$1" - ;; - esac - shift -done - -# print os and set binary -os_name=$(uname -s) -if [ "$os_name" = "Darwin" ]; then - echo "Running on macOS" - binary="$scriptdir"/../lvgl_micropython/build/lvgl_micropy_macOS -else - # other cases can be added here - echo "Running on $os_name" - binary="$scriptdir"/../lvgl_micropython/build/lvgl_micropy_unix -fi - -binary=$(readlink -f "$binary") -chmod +x "$binary" - -one_test() { - file="$1" - if [ ! -f "$file" ]; then - echo "ERROR: $file is not a regular, existing file!" - exit 1 - fi - pushd "$fs" - echo "Testing $file" - - # Detect if this is a graphical test (filename contains "graphical") - if echo "$file" | grep -q "graphical"; then - echo "Detected graphical test - including boot and main files" - is_graphical=1 - # Get absolute path to tests directory for imports - tests_abs_path=$(readlink -f "$testdir") - else - is_graphical=0 - fi - - if [ -z "$ondevice" ]; then - # Desktop execution - if [ $is_graphical -eq 1 ]; then - # Graphical test: include boot_unix.py and main.py - "$binary" -X heapsize=8M -c "$(cat main.py) ; import mpos.main ; import mpos.apps; sys.path.append(\"$tests_abs_path\") -$(cat $file) -result = unittest.main() ; sys.exit(0 if result.wasSuccessful() else 1) " - result=$? - else - # Regular test: no boot files - "$binary" -X heapsize=8M -c "$(cat main.py) -$(cat $file) -result = unittest.main() ; sys.exit(0 if result.wasSuccessful() else 1) " - result=$? - fi - else - if [ ! -z "$ondevice" ]; then - echo "Hack: reset the device to make sure no previous UnitTest classes have been registered..." - "$mpremote" reset - sleep 15 - fi - - echo "Device execution" - # NOTE: On device, the OS is already running with boot.py and main.py executed, - # so we don't need to (and shouldn't) re-run them. The system is already initialized. - cleanname=$(echo "$file" | sed "s#/#_#g") - testlog=/tmp/"$cleanname".log - echo "$test logging to $testlog" - if [ $is_graphical -eq 1 ]; then - # Graphical test: system already initialized, just add test paths - "$mpremote" exec "$(cat main.py) ; sys.path.append('tests') -$(cat $file) -result = unittest.main() -if result.wasSuccessful(): - print('TEST WAS A SUCCESS') -else: - print('TEST WAS A FAILURE') -" | tee "$testlog" - else - # Regular test: no boot files - "$mpremote" exec "$(cat main.py) -$(cat $file) -result = unittest.main() -if result.wasSuccessful(): - print('TEST WAS A SUCCESS') -else: - print('TEST WAS A FAILURE') -" | tee "$testlog" - fi - grep -q "TEST WAS A SUCCESS" "$testlog" - result=$? - fi - popd - return "$result" -} - -failed=0 -ran=0 - -if [ -z "$onetest" ]; then - echo "Usage: $0 [one_test_to_run.py] [--ondevice]" - echo "Example: $0 tests/simple.py" - echo "Example: $0 tests/simple.py --ondevice" - echo "Example: $0 --ondevice" - echo - echo "If no test is specified: run all tests from $testdir on local machine." - echo - echo "The '--ondevice' flag will run the test(s) on a connected device using mpremote.py (should be on the PATH) over a serial connection." - while read file; do - one_test "$file" - result=$? - if [ $result -ne 0 ]; then - echo -e "\n\n\nWARNING: test $file got error $result !!!\n\n\n" - failed=$(expr $failed \+ 1) - exit 1 - else - ran=$(expr $ran \+ 1) - fi - done < <( find "$testdir" -iname "test_*.py" ) -else - echo "doing $onetest" - one_test $(readlink -f "$onetest") - [ $? -ne 0 ] && failed=1 -fi - - -if [ $failed -ne 0 ]; then - echo "ERROR: $failed of the $ran tests failed" - exit 1 -else - echo "GOOD: none of the $ran tests failed" - exit 0 -fi -