You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
WifiService: connect to strongest networks first
This commit is contained in:
@@ -11,6 +11,8 @@ internal_filesystem/data
|
||||
internal_filesystem/sdcard
|
||||
internal_filesystem/tests
|
||||
|
||||
internal_filesystem_excluded/
|
||||
|
||||
# these tests contain actual NWC URLs:
|
||||
tests/manual_test_nwcwallet_alby.py
|
||||
tests/manual_test_nwcwallet_cashu.py
|
||||
@@ -21,3 +23,6 @@ __pycache__/
|
||||
*$py.class
|
||||
*.so
|
||||
.Python
|
||||
|
||||
# these get created:
|
||||
c_mpos/c_mpos
|
||||
|
||||
@@ -46,6 +46,7 @@ class WifiService:
|
||||
def connect(network_module=None):
|
||||
"""
|
||||
Scan for available networks and connect to the first saved network found.
|
||||
Networks are tried in order of signal strength (strongest first).
|
||||
|
||||
Args:
|
||||
network_module: Network module for dependency injection (testing)
|
||||
@@ -63,9 +64,14 @@ class WifiService:
|
||||
# Scan for available networks
|
||||
networks = wlan.scan()
|
||||
|
||||
# Sort networks by RSSI (signal strength) in descending order
|
||||
# RSSI is at index 3, higher values (less negative) = stronger signal
|
||||
networks = sorted(networks, key=lambda n: n[3], reverse=True)
|
||||
|
||||
for n in networks:
|
||||
ssid = n[0].decode()
|
||||
print(f"WifiService: Found network '{ssid}'")
|
||||
rssi = n[3]
|
||||
print(f"WifiService: Found network '{ssid}' (RSSI: {rssi} dBm)")
|
||||
|
||||
if ssid in WifiService.access_points:
|
||||
password = WifiService.access_points.get(ssid).get("password")
|
||||
|
||||
@@ -455,3 +455,259 @@ class TestWifiServiceDisconnect(unittest.TestCase):
|
||||
WifiService.disconnect(network_module=None)
|
||||
|
||||
|
||||
class TestWifiServiceRSSISorting(unittest.TestCase):
|
||||
"""Test RSSI-based network prioritization."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test fixtures."""
|
||||
MockSharedPreferences.reset_all()
|
||||
WifiService.access_points = {}
|
||||
WifiService.wifi_busy = False
|
||||
|
||||
def tearDown(self):
|
||||
"""Clean up after tests."""
|
||||
WifiService.access_points = {}
|
||||
WifiService.wifi_busy = False
|
||||
MockSharedPreferences.reset_all()
|
||||
|
||||
def test_networks_sorted_by_rssi_strongest_first(self):
|
||||
"""Test that networks are sorted by RSSI with strongest first."""
|
||||
# Create mock networks with different RSSI values
|
||||
# Format: (ssid, bssid, channel, rssi, security, hidden)
|
||||
mock_network = MockNetwork(connected=False)
|
||||
mock_wlan = mock_network.WLAN(mock_network.STA_IF)
|
||||
|
||||
# Unsorted networks (weak, strong, medium)
|
||||
mock_wlan._scan_results = [
|
||||
(b'WeakNetwork', b'\xaa\xbb\xcc\xdd\xee\xff', 6, -85, 3, False),
|
||||
(b'StrongNetwork', b'\x11\x22\x33\x44\x55\x66', 1, -45, 3, False),
|
||||
(b'MediumNetwork', b'\x77\x88\x99\xaa\xbb\xcc', 11, -65, 3, False),
|
||||
]
|
||||
|
||||
# Configure all as saved networks
|
||||
WifiService.access_points = {
|
||||
'WeakNetwork': {'password': 'weak123'},
|
||||
'StrongNetwork': {'password': 'strong123'},
|
||||
'MediumNetwork': {'password': 'medium123'}
|
||||
}
|
||||
|
||||
# Track connection attempts
|
||||
connection_attempts = []
|
||||
|
||||
def mock_connect(ssid, password):
|
||||
connection_attempts.append(ssid)
|
||||
# Succeed on first attempt
|
||||
mock_wlan._connected = True
|
||||
|
||||
mock_wlan.connect = mock_connect
|
||||
|
||||
result = WifiService.connect(network_module=mock_network)
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should try strongest first (-45 dBm)
|
||||
self.assertEqual(connection_attempts[0], 'StrongNetwork')
|
||||
# Should only try one (first succeeds)
|
||||
self.assertEqual(len(connection_attempts), 1)
|
||||
|
||||
def test_multiple_networks_tried_in_rssi_order(self):
|
||||
"""Test that multiple networks are tried in RSSI order when first fails."""
|
||||
mock_network = MockNetwork(connected=False)
|
||||
mock_wlan = mock_network.WLAN(mock_network.STA_IF)
|
||||
|
||||
# Three networks with different signal strengths
|
||||
mock_wlan._scan_results = [
|
||||
(b'BadNetwork1', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -40, 3, False),
|
||||
(b'BadNetwork2', b'\x11\x22\x33\x44\x55\x66', 6, -50, 3, False),
|
||||
(b'GoodNetwork', b'\x77\x88\x99\xaa\xbb\xcc', 11, -60, 3, False),
|
||||
]
|
||||
|
||||
WifiService.access_points = {
|
||||
'BadNetwork1': {'password': 'pass1'},
|
||||
'BadNetwork2': {'password': 'pass2'},
|
||||
'GoodNetwork': {'password': 'pass3'}
|
||||
}
|
||||
|
||||
# Track attempts and make first two fail
|
||||
connection_attempts = []
|
||||
|
||||
def mock_connect(ssid, password):
|
||||
connection_attempts.append(ssid)
|
||||
# Only succeed on third attempt
|
||||
if len(connection_attempts) >= 3:
|
||||
mock_wlan._connected = True
|
||||
|
||||
mock_wlan.connect = mock_connect
|
||||
|
||||
result = WifiService.connect(network_module=mock_network)
|
||||
|
||||
self.assertTrue(result)
|
||||
# Verify order: strongest to weakest
|
||||
self.assertEqual(connection_attempts[0], 'BadNetwork1') # RSSI -40
|
||||
self.assertEqual(connection_attempts[1], 'BadNetwork2') # RSSI -50
|
||||
self.assertEqual(connection_attempts[2], 'GoodNetwork') # RSSI -60
|
||||
self.assertEqual(len(connection_attempts), 3)
|
||||
|
||||
def test_duplicate_ssid_strongest_tried_first(self):
|
||||
"""Test that with duplicate SSIDs, strongest signal is tried first."""
|
||||
mock_network = MockNetwork(connected=False)
|
||||
mock_wlan = mock_network.WLAN(mock_network.STA_IF)
|
||||
|
||||
# Real-world scenario: Multiple APs with same SSID
|
||||
mock_wlan._scan_results = [
|
||||
(b'MyNetwork', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -70, 3, False),
|
||||
(b'MyNetwork', b'\x11\x22\x33\x44\x55\x66', 6, -50, 3, False), # Strongest
|
||||
(b'MyNetwork', b'\x77\x88\x99\xaa\xbb\xcc', 11, -85, 3, False),
|
||||
]
|
||||
|
||||
WifiService.access_points = {
|
||||
'MyNetwork': {'password': 'mypass123'}
|
||||
}
|
||||
|
||||
connection_attempts = []
|
||||
|
||||
def mock_connect(ssid, password):
|
||||
connection_attempts.append(ssid)
|
||||
# Succeed on first
|
||||
mock_wlan._connected = True
|
||||
|
||||
mock_wlan.connect = mock_connect
|
||||
|
||||
result = WifiService.connect(network_module=mock_network)
|
||||
|
||||
self.assertTrue(result)
|
||||
# Should only try once (first is strongest and succeeds)
|
||||
self.assertEqual(len(connection_attempts), 1)
|
||||
self.assertEqual(connection_attempts[0], 'MyNetwork')
|
||||
|
||||
def test_rssi_order_with_real_scan_data(self):
|
||||
"""Test with real scan data from actual ESP32 device."""
|
||||
mock_network = MockNetwork(connected=False)
|
||||
mock_wlan = mock_network.WLAN(mock_network.STA_IF)
|
||||
|
||||
# Real scan output from user's example
|
||||
mock_wlan._scan_results = [
|
||||
(b'Channel 8', b'\xde\xec^\x8f\x00A', 11, -47, 3, False),
|
||||
(b'Baptistus', b'\xd8\xec^\x8f\x00A', 11, -48, 7, False),
|
||||
(b'telenet-BD74DC9', b'TgQ>t\xe7', 11, -70, 3, False),
|
||||
(b'Galaxy S10+64bf', b'b\x19\xdf\xef\xb0\x8f', 11, -83, 3, False),
|
||||
(b'Najeeb\xe2\x80\x99s iPhone', b"F\x07'\xb8\x0b0", 6, -84, 7, False),
|
||||
(b'DIRECT-83-HP OfficeJet Pro 7740', b'\x1a`$dk\x83', 1, -87, 3, False),
|
||||
(b'Channel 8', b'\xde\xec^\xe1#w', 1, -91, 3, False),
|
||||
(b'Baptistus', b'\xd8\xec^\xe1#w', 1, -91, 7, False),
|
||||
(b'Proximus-Home-596457', b'\xf4\x05\x95\xf9A\xf1', 1, -93, 3, False),
|
||||
(b'Proximus-Home-596457', b'\xcc\x00\xf1j}\x94', 1, -93, 3, False),
|
||||
(b'BASE-9104320', b'4,\xc4\xe7\x01\xb7', 1, -94, 3, False),
|
||||
]
|
||||
|
||||
# Save several networks
|
||||
WifiService.access_points = {
|
||||
'Channel 8': {'password': 'pass1'},
|
||||
'Baptistus': {'password': 'pass2'},
|
||||
'telenet-BD74DC9': {'password': 'pass3'},
|
||||
'Galaxy S10+64bf': {'password': 'pass4'},
|
||||
}
|
||||
|
||||
# Track attempts and fail first to see ordering
|
||||
connection_attempts = []
|
||||
|
||||
def mock_connect(ssid, password):
|
||||
connection_attempts.append(ssid)
|
||||
# Succeed on second attempt
|
||||
if len(connection_attempts) >= 2:
|
||||
mock_wlan._connected = True
|
||||
|
||||
mock_wlan.connect = mock_connect
|
||||
|
||||
result = WifiService.connect(network_module=mock_network)
|
||||
|
||||
self.assertTrue(result)
|
||||
# Expected order: Channel 8 (-47), Baptistus (-48), telenet (-70), Galaxy (-83)
|
||||
self.assertEqual(connection_attempts[0], 'Channel 8')
|
||||
self.assertEqual(connection_attempts[1], 'Baptistus')
|
||||
self.assertEqual(len(connection_attempts), 2)
|
||||
|
||||
def test_sorting_preserves_network_data_integrity(self):
|
||||
"""Test that sorting doesn't corrupt or lose network data."""
|
||||
mock_network = MockNetwork(connected=False)
|
||||
mock_wlan = mock_network.WLAN(mock_network.STA_IF)
|
||||
|
||||
# Networks with various attributes
|
||||
mock_wlan._scan_results = [
|
||||
(b'Net3', b'\xaa\xaa\xaa\xaa\xaa\xaa', 11, -90, 3, False),
|
||||
(b'Net1', b'\xbb\xbb\xbb\xbb\xbb\xbb', 1, -40, 7, True), # Hidden
|
||||
(b'Net2', b'\xcc\xcc\xcc\xcc\xcc\xcc', 6, -60, 2, False),
|
||||
]
|
||||
|
||||
WifiService.access_points = {
|
||||
'Net1': {'password': 'p1'},
|
||||
'Net2': {'password': 'p2'},
|
||||
'Net3': {'password': 'p3'}
|
||||
}
|
||||
|
||||
# Track attempts to verify all are tried
|
||||
connection_attempts = []
|
||||
|
||||
def mock_connect(ssid, password):
|
||||
connection_attempts.append(ssid)
|
||||
# Never succeed, try all
|
||||
pass
|
||||
|
||||
mock_wlan.connect = mock_connect
|
||||
|
||||
result = WifiService.connect(network_module=mock_network)
|
||||
|
||||
self.assertFalse(result) # No connection succeeded
|
||||
# Verify all 3 were attempted in RSSI order
|
||||
self.assertEqual(len(connection_attempts), 3)
|
||||
self.assertEqual(connection_attempts[0], 'Net1') # RSSI -40
|
||||
self.assertEqual(connection_attempts[1], 'Net2') # RSSI -60
|
||||
self.assertEqual(connection_attempts[2], 'Net3') # RSSI -90
|
||||
|
||||
def test_no_saved_networks_in_scan(self):
|
||||
"""Test behavior when scan finds no saved networks."""
|
||||
mock_network = MockNetwork(connected=False)
|
||||
mock_wlan = mock_network.WLAN(mock_network.STA_IF)
|
||||
|
||||
mock_wlan._scan_results = [
|
||||
(b'UnknownNet1', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -50, 3, False),
|
||||
(b'UnknownNet2', b'\x11\x22\x33\x44\x55\x66', 6, -60, 3, False),
|
||||
]
|
||||
|
||||
WifiService.access_points = {
|
||||
'SavedNetwork': {'password': 'pass123'}
|
||||
}
|
||||
|
||||
connection_attempts = []
|
||||
|
||||
def mock_connect(ssid, password):
|
||||
connection_attempts.append(ssid)
|
||||
|
||||
mock_wlan.connect = mock_connect
|
||||
|
||||
result = WifiService.connect(network_module=mock_network)
|
||||
|
||||
self.assertFalse(result)
|
||||
# No attempts should be made
|
||||
self.assertEqual(len(connection_attempts), 0)
|
||||
|
||||
def test_rssi_logging_shows_signal_strength(self):
|
||||
"""Test that RSSI value is logged during scan (for debugging)."""
|
||||
# This is more of a documentation test to verify the log format
|
||||
mock_network = MockNetwork(connected=False)
|
||||
mock_wlan = mock_network.WLAN(mock_network.STA_IF)
|
||||
|
||||
mock_wlan._scan_results = [
|
||||
(b'TestNet', b'\xaa\xbb\xcc\xdd\xee\xff', 1, -55, 3, False),
|
||||
]
|
||||
|
||||
WifiService.access_points = {
|
||||
'TestNet': {'password': 'pass'}
|
||||
}
|
||||
|
||||
# The connect method now logs "Found network 'TestNet' (RSSI: -55 dBm)"
|
||||
# This test just verifies it doesn't crash
|
||||
result = WifiService.connect(network_module=mock_network)
|
||||
# Since mock doesn't actually connect, this will likely be False
|
||||
# but the important part is the code runs without error
|
||||
|
||||
|
||||
|
||||
@@ -1,152 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
mydir=$(readlink -f "$0")
|
||||
mydir=$(dirname "$mydir")
|
||||
testdir="$mydir"
|
||||
scriptdir=$(readlink -f "$mydir"/../scripts/)
|
||||
fs="$mydir"/../internal_filesystem/
|
||||
mpremote="$mydir"/../lvgl_micropython/lib/micropython/tools/mpremote/mpremote.py
|
||||
|
||||
# Parse arguments
|
||||
ondevice=""
|
||||
onetest=""
|
||||
|
||||
while [ $# -gt 0 ]; do
|
||||
case "$1" in
|
||||
--ondevice)
|
||||
ondevice="yes"
|
||||
;;
|
||||
*)
|
||||
onetest="$1"
|
||||
;;
|
||||
esac
|
||||
shift
|
||||
done
|
||||
|
||||
# print os and set binary
|
||||
os_name=$(uname -s)
|
||||
if [ "$os_name" = "Darwin" ]; then
|
||||
echo "Running on macOS"
|
||||
binary="$scriptdir"/../lvgl_micropython/build/lvgl_micropy_macOS
|
||||
else
|
||||
# other cases can be added here
|
||||
echo "Running on $os_name"
|
||||
binary="$scriptdir"/../lvgl_micropython/build/lvgl_micropy_unix
|
||||
fi
|
||||
|
||||
binary=$(readlink -f "$binary")
|
||||
chmod +x "$binary"
|
||||
|
||||
one_test() {
|
||||
file="$1"
|
||||
if [ ! -f "$file" ]; then
|
||||
echo "ERROR: $file is not a regular, existing file!"
|
||||
exit 1
|
||||
fi
|
||||
pushd "$fs"
|
||||
echo "Testing $file"
|
||||
|
||||
# Detect if this is a graphical test (filename contains "graphical")
|
||||
if echo "$file" | grep -q "graphical"; then
|
||||
echo "Detected graphical test - including boot and main files"
|
||||
is_graphical=1
|
||||
# Get absolute path to tests directory for imports
|
||||
tests_abs_path=$(readlink -f "$testdir")
|
||||
else
|
||||
is_graphical=0
|
||||
fi
|
||||
|
||||
if [ -z "$ondevice" ]; then
|
||||
# Desktop execution
|
||||
if [ $is_graphical -eq 1 ]; then
|
||||
# Graphical test: include boot_unix.py and main.py
|
||||
"$binary" -X heapsize=8M -c "$(cat main.py) ; import mpos.main ; import mpos.apps; sys.path.append(\"$tests_abs_path\")
|
||||
$(cat $file)
|
||||
result = unittest.main() ; sys.exit(0 if result.wasSuccessful() else 1) "
|
||||
result=$?
|
||||
else
|
||||
# Regular test: no boot files
|
||||
"$binary" -X heapsize=8M -c "$(cat main.py)
|
||||
$(cat $file)
|
||||
result = unittest.main() ; sys.exit(0 if result.wasSuccessful() else 1) "
|
||||
result=$?
|
||||
fi
|
||||
else
|
||||
if [ ! -z "$ondevice" ]; then
|
||||
echo "Hack: reset the device to make sure no previous UnitTest classes have been registered..."
|
||||
"$mpremote" reset
|
||||
sleep 15
|
||||
fi
|
||||
|
||||
echo "Device execution"
|
||||
# NOTE: On device, the OS is already running with boot.py and main.py executed,
|
||||
# so we don't need to (and shouldn't) re-run them. The system is already initialized.
|
||||
cleanname=$(echo "$file" | sed "s#/#_#g")
|
||||
testlog=/tmp/"$cleanname".log
|
||||
echo "$test logging to $testlog"
|
||||
if [ $is_graphical -eq 1 ]; then
|
||||
# Graphical test: system already initialized, just add test paths
|
||||
"$mpremote" exec "$(cat main.py) ; sys.path.append('tests')
|
||||
$(cat $file)
|
||||
result = unittest.main()
|
||||
if result.wasSuccessful():
|
||||
print('TEST WAS A SUCCESS')
|
||||
else:
|
||||
print('TEST WAS A FAILURE')
|
||||
" | tee "$testlog"
|
||||
else
|
||||
# Regular test: no boot files
|
||||
"$mpremote" exec "$(cat main.py)
|
||||
$(cat $file)
|
||||
result = unittest.main()
|
||||
if result.wasSuccessful():
|
||||
print('TEST WAS A SUCCESS')
|
||||
else:
|
||||
print('TEST WAS A FAILURE')
|
||||
" | tee "$testlog"
|
||||
fi
|
||||
grep -q "TEST WAS A SUCCESS" "$testlog"
|
||||
result=$?
|
||||
fi
|
||||
popd
|
||||
return "$result"
|
||||
}
|
||||
|
||||
failed=0
|
||||
ran=0
|
||||
|
||||
if [ -z "$onetest" ]; then
|
||||
echo "Usage: $0 [one_test_to_run.py] [--ondevice]"
|
||||
echo "Example: $0 tests/simple.py"
|
||||
echo "Example: $0 tests/simple.py --ondevice"
|
||||
echo "Example: $0 --ondevice"
|
||||
echo
|
||||
echo "If no test is specified: run all tests from $testdir on local machine."
|
||||
echo
|
||||
echo "The '--ondevice' flag will run the test(s) on a connected device using mpremote.py (should be on the PATH) over a serial connection."
|
||||
while read file; do
|
||||
one_test "$file"
|
||||
result=$?
|
||||
if [ $result -ne 0 ]; then
|
||||
echo -e "\n\n\nWARNING: test $file got error $result !!!\n\n\n"
|
||||
failed=$(expr $failed \+ 1)
|
||||
exit 1
|
||||
else
|
||||
ran=$(expr $ran \+ 1)
|
||||
fi
|
||||
done < <( find "$testdir" -iname "test_*.py" )
|
||||
else
|
||||
echo "doing $onetest"
|
||||
one_test $(readlink -f "$onetest")
|
||||
[ $? -ne 0 ] && failed=1
|
||||
fi
|
||||
|
||||
|
||||
if [ $failed -ne 0 ]; then
|
||||
echo "ERROR: $failed of the $ran tests failed"
|
||||
exit 1
|
||||
else
|
||||
echo "GOOD: none of the $ran tests failed"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
Reference in New Issue
Block a user