You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
Rework WifiService to lock while scanning
This commit is contained in:
@@ -8,6 +8,7 @@ from mpos.apps import Activity, Intent
|
||||
|
||||
import mpos.config
|
||||
import mpos.ui.anim
|
||||
import mpos.wifi
|
||||
|
||||
have_network = True
|
||||
try:
|
||||
@@ -46,8 +47,8 @@ class WiFi(Activity):
|
||||
self.aplist.align(lv.ALIGN.TOP_MID,0,0)
|
||||
print("create_ui: Creating error label")
|
||||
self.error_label=lv.label(main_screen)
|
||||
self.error_label.set_text("")
|
||||
self.error_label.align(lv.ALIGN.BOTTOM_MID,0,-40)
|
||||
self.error_label.set_text("THIS IS ERROR TEXT THAT WILL BE SET LATER")
|
||||
self.error_label.align_to(self.aplist, lv.ALIGN.OUT_BOTTOM_MID,0,0)
|
||||
self.error_label.add_flag(lv.obj.FLAG.HIDDEN)
|
||||
print("create_ui: Creating Scan button")
|
||||
self.scan_button=lv.button(main_screen)
|
||||
@@ -63,8 +64,12 @@ class WiFi(Activity):
|
||||
global access_points
|
||||
access_points = mpos.config.SharedPreferences("com.micropythonos.system.wifiservice").get_dict("access_points")
|
||||
self.keep_running = True
|
||||
if len(self.ssids) == 0:
|
||||
self.start_scan_networks()
|
||||
if mpos.wifi.WifiService.wifi_busy == False:
|
||||
mpos.wifi.WifiService.wifi_busy = True
|
||||
if len(self.ssids) == 0:
|
||||
self.start_scan_networks()
|
||||
else:
|
||||
self.show_error("Wifi is busy, please try again later.")
|
||||
|
||||
def onStop(self, screen):
|
||||
self.keep_running = False
|
||||
@@ -75,7 +80,7 @@ class WiFi(Activity):
|
||||
print(f"show_error: Displaying error: {message}")
|
||||
lv.async_call(lambda l: self.error_label.set_text(message), None)
|
||||
lv.async_call(lambda l: self.error_label.remove_flag(lv.obj.FLAG.HIDDEN), None)
|
||||
timer=lv.timer_create(lambda t: self.error_label.add_flag(lv.obj.FLAG.HIDDEN),3000,None)
|
||||
timer=lv.timer_create(lambda t: self.error_label.add_flag(lv.obj.FLAG.HIDDEN),5000,None)
|
||||
timer.set_repeat_count(1)
|
||||
|
||||
def scan_networks_thread(self):
|
||||
@@ -99,6 +104,7 @@ class WiFi(Activity):
|
||||
self.show_error("Wi-Fi scan failed")
|
||||
# scan done:
|
||||
self.busy_scanning = False
|
||||
mpos.wifi.WifiService.wifi_busy = False
|
||||
if self.keep_running:
|
||||
# Schedule UI updates because different thread
|
||||
lv.async_call(lambda l: self.scan_button_label.set_text(self.scan_button_scan_text), None)
|
||||
@@ -147,7 +153,7 @@ class WiFi(Activity):
|
||||
def scan_cb(self, event):
|
||||
print("scan_cb: Scan button clicked, refreshing list")
|
||||
self.start_scan_networks()
|
||||
|
||||
|
||||
def select_ssid_cb(self,ssid):
|
||||
print(f"select_ssid_cb: SSID selected: {ssid}")
|
||||
intent = Intent(activity_class=PasswordPage)
|
||||
|
||||
@@ -10,6 +10,9 @@ import time
|
||||
import mpos.config
|
||||
import mpos.time
|
||||
|
||||
# crude lock on wifi:
|
||||
wifi_busy = False
|
||||
|
||||
def auto_connect():
|
||||
networks = wlan.scan()
|
||||
for n in networks:
|
||||
@@ -48,7 +51,6 @@ def attempt_connecting(ssid,password):
|
||||
print(f"auto_connect.py attempt_connecting: Connection error: {e}")
|
||||
return False
|
||||
|
||||
|
||||
print("WifiService.py running")
|
||||
|
||||
have_network=True
|
||||
@@ -65,12 +67,15 @@ if not have_network:
|
||||
print("WifiService.py: no network module found, exiting...")
|
||||
elif len(access_points):
|
||||
wlan=network.WLAN(network.STA_IF)
|
||||
wlan.active(False) # restart WiFi hardware in case it's in a bad state
|
||||
wlan.active(True)
|
||||
if auto_connect():
|
||||
print("WifiService.py managed to connect.")
|
||||
else:
|
||||
print("WifiService.py did not manage to connect.")
|
||||
wlan.active(False) # disable to conserve power
|
||||
if not wifi_busy:
|
||||
wifi_busy = True
|
||||
wlan.active(False) # restart WiFi hardware in case it's in a bad state
|
||||
wlan.active(True)
|
||||
if auto_connect():
|
||||
print("WifiService.py managed to connect.")
|
||||
else:
|
||||
print("WifiService.py did not manage to connect.")
|
||||
wlan.active(False) # disable to conserve power
|
||||
wifi_busy = False
|
||||
else:
|
||||
print("WifiService.py: not access points configured, exiting...")
|
||||
|
||||
@@ -219,17 +219,6 @@ def parse_manifest(manifest_path):
|
||||
except OSError:
|
||||
print(f"parse_manifest: error loading manifest_path: {manifest_path}")
|
||||
return default_app
|
||||
|
||||
|
||||
|
||||
def auto_connect():
|
||||
builtin_auto_connect = "builtin/system/WifiService.py"
|
||||
try:
|
||||
print(f"Starting {builtin_auto_connect}...")
|
||||
stat = uos.stat(builtin_auto_connect)
|
||||
execute_script_new_thread(builtin_auto_connect, True)
|
||||
except Exception as e:
|
||||
print("Couldn't execute {builtin_auto_connect} because exception {e}, continuing...")
|
||||
|
||||
|
||||
class Activity:
|
||||
|
||||
@@ -2,6 +2,7 @@ import utime # for timing calls
|
||||
|
||||
import lvgl as lv
|
||||
import mpos.apps
|
||||
import mpos.wifi
|
||||
from mpos.ui.anim import WidgetAnimator
|
||||
|
||||
th = None
|
||||
@@ -197,7 +198,7 @@ def create_notification_bar():
|
||||
print("Warning: could not check WLAN status:", str(e))
|
||||
|
||||
def update_wifi_icon(timer):
|
||||
if not can_check_network or network.WLAN(network.STA_IF).isconnected():
|
||||
if mpos.wifi.WifiService.is_connected():
|
||||
wifi_icon.remove_flag(lv.obj.FLAG.HIDDEN)
|
||||
else:
|
||||
wifi_icon.add_flag(lv.obj.FLAG.HIDDEN)
|
||||
|
||||
@@ -0,0 +1,98 @@
|
||||
# Automatically connect to the WiFi, based on the saved networks
|
||||
# Manage concurrent accesses to the wifi (scan while connect, connect while scan etc)
|
||||
# Manage saved networks
|
||||
# This gets started in a new thread, does an autoconnect, and exits.
|
||||
|
||||
import ujson
|
||||
import os
|
||||
import time
|
||||
|
||||
import mpos.config
|
||||
import mpos.time
|
||||
|
||||
have_network = False
|
||||
try:
|
||||
import network
|
||||
have_network = True
|
||||
except Exception as e:
|
||||
print("Could not import network, have_network=False")
|
||||
|
||||
class WifiService():
|
||||
|
||||
wifi_busy = False # crude lock on wifi
|
||||
|
||||
@staticmethod
|
||||
def connect():
|
||||
networks = wlan.scan()
|
||||
for n in networks:
|
||||
ssid = n[0].decode()
|
||||
print(f"auto_connect: checking ssid '{ssid}'")
|
||||
if ssid in access_points:
|
||||
password = access_points.get(ssid).get("password")
|
||||
print(f"auto_connect: attempting to connect to saved network {ssid} with password {password}")
|
||||
if attempt_connecting(ssid,password):
|
||||
print(f"auto_connect: Connected to {ssid}")
|
||||
return True
|
||||
else:
|
||||
print(f"auto_connect: failed to connect to {ssid}")
|
||||
else:
|
||||
print(f"auto_connect: not trying {ssid} because it hasn't been configured")
|
||||
print("auto_connect: no known networks connected")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def attempt_connecting(ssid,password):
|
||||
print(f"auto_connect.py attempt_connecting: Attempting to connect to SSID: {ssid}")
|
||||
try:
|
||||
wlan.connect(ssid,password)
|
||||
for i in range(10):
|
||||
if wlan.isconnected():
|
||||
print(f"auto_connect.py attempt_connecting: Connected to {ssid} after {i+1} seconds")
|
||||
mpos.time.sync_time()
|
||||
return True
|
||||
elif not wlan.active(): # wificonf app or others might stop the wifi, no point in continuing then
|
||||
print("auto_connect.py attempt_connecting: Someone disabled wifi, bailing out...")
|
||||
return False
|
||||
print(f"auto_connect.py attempt_connecting: Waiting for connection, attempt {i+1}/10")
|
||||
time.sleep(1)
|
||||
print(f"auto_connect.py attempt_connecting: Failed to connect to {ssid}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"auto_connect.py attempt_connecting: Connection error: {e}")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def auto_connect():
|
||||
print("auto_connect thread running")
|
||||
|
||||
# load config:
|
||||
access_points = mpos.config.SharedPreferences("com.micropythonos.system.wifiservice").get_dict("access_points")
|
||||
if not len(access_points):
|
||||
print("WifiService.py: not access points configured, exiting...")
|
||||
return
|
||||
|
||||
if not WifiService.wifi_busy:
|
||||
WifiService.wifi_busy = True
|
||||
if not have_network:
|
||||
print("auto_connect: no network module found, waiting to simulate connection...")
|
||||
time.sleep(10)
|
||||
print("auto_connect: wifi connect simulation done")
|
||||
else:
|
||||
wlan=network.WLAN(network.STA_IF)
|
||||
wlan.active(False) # restart WiFi hardware in case it's in a bad state
|
||||
wlan.active(True)
|
||||
if connect():
|
||||
print("WifiService.py managed to connect.")
|
||||
else:
|
||||
print("WifiService.py did not manage to connect.")
|
||||
wlan.active(False) # disable to conserve power
|
||||
WifiService.wifi_busy = False
|
||||
|
||||
@staticmethod
|
||||
def is_connected():
|
||||
if WifiService.wifi_busy:
|
||||
return False
|
||||
elif not have_network:
|
||||
return True
|
||||
else:
|
||||
return network.WLAN(network.STA_IF).isconnected()
|
||||
@@ -1,4 +1,5 @@
|
||||
import task_handler
|
||||
import _thread
|
||||
|
||||
# Allow LVGL M:/path/to/file or M:relative/path/to/file to work for image set_src etc
|
||||
import fs_driver
|
||||
@@ -35,7 +36,13 @@ from mpos import apps
|
||||
|
||||
apps.execute_script("builtin/system/button.py", True) # Install button handler through IRQ
|
||||
|
||||
apps.auto_connect()
|
||||
try:
|
||||
import mpos.wifi
|
||||
import mpos.apps
|
||||
_thread.stack_size(mpos.apps.good_stack_size())
|
||||
_thread.start_new_thread(mpos.wifi.WifiService.auto_connect, ())
|
||||
except Exception as e:
|
||||
print(f"Couldn't start mpos.wifi.WifiService.auto_connect thread because: {e}")
|
||||
|
||||
apps.restart_launcher()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user