diff --git a/internal_filesystem/lib/websocket.py b/internal_filesystem/lib/websocket.py index 8b86ed61..d5112035 100644 --- a/internal_filesystem/lib/websocket.py +++ b/internal_filesystem/lib/websocket.py @@ -3,7 +3,6 @@ # Compatible with websocket-client's WebSocketApp API, using MicroPython aiohttp import uasyncio as asyncio -import _thread import time import ucollections import aiohttp @@ -34,18 +33,18 @@ class WebSocketConnectionClosedException(WebSocketException): class WebSocketTimeoutException(WebSocketException): pass -# Queue for cross-thread callback dispatching +# Queue for callback dispatching (in same thread) _callback_queue = ucollections.deque((), 100) # Empty tuple, maxlen=100 def _run_callback(callback, *args): - """Add callback to queue for main thread execution.""" + """Add callback to queue for execution.""" try: _callback_queue.append((callback, args)) except IndexError: _log_error("Callback queue full, dropping callback") def _process_callbacks(): - """Process queued callbacks in the main thread.""" + """Process queued callbacks.""" while _callback_queue: try: callback, args = _callback_queue.popleft() @@ -97,21 +96,19 @@ class WebSocketApp: self.ws = None self.session = None self.running = False - self.thread = None self.ping_interval = 0 self.ping_timeout = None self.ping_payload = "" self.last_ping_tm = 0 self.last_pong_tm = 0 self.has_errored = False + self._loop = asyncio.get_event_loop() def send(self, data, opcode=ABNF.OPCODE_TEXT): """Send a message.""" if not self.ws or not self.running: raise WebSocketConnectionClosedException("Connection is already closed.") - # Schedule send in async loop - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe(self._send_async(data, opcode), loop) + asyncio.create_task(self._send_async(data, opcode)) def send_text(self, text_data): """Send UTF-8 text.""" @@ -124,18 +121,22 @@ class WebSocketApp: def close(self, **kwargs): """Close the WebSocket connection.""" self.running = False - if self.ws: - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe(self.ws.close(), loop) - if self.session: - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe(self.session.__aexit__(None, None, None), loop) + asyncio.create_task(self._close_async()) + + async def _close_async(self): + """Async close implementation.""" + try: + if self.ws and not self.ws.ws.closed: + await self.ws.close() + if self.session: + await self.session.__aexit__(None, None, None) + except Exception as e: + _log_error(f"Error closing WebSocket: {e}") def _start_ping_thread(self): - """Simulate ping/pong in async loop.""" + """Start ping task.""" if self.ping_interval: - loop = asyncio.get_event_loop() - asyncio.run_coroutine_threadsafe(self._send_ping_async(), loop) + asyncio.create_task(self._send_ping_async()) def _stop_ping_thread(self): """No-op, ping handled in async loop.""" @@ -176,7 +177,7 @@ class WebSocketApp: proxy_type=None, reconnect=None, ): - """Run the WebSocket event loop.""" + """Run the WebSocket event loop in the main thread.""" if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type: raise WebSocketException("Proxy and sockopt not supported in MicroPython") if dispatcher: @@ -193,25 +194,19 @@ class WebSocketApp: self.ping_payload = ping_payload self.running = True - # Start async event loop in a separate thread - self.thread = _thread.start_new_thread(self._run_async_loop, ()) - - # Main thread processes callbacks + # Run the event loop in the main thread try: - while self.running: - _process_callbacks() - time.sleep(0.01) # Yield to other tasks + self._loop.run_until_complete(self._async_main()) except KeyboardInterrupt: + print("run_forever got KeyboardInterrupt") self.close() return False + except Exception as e: + _log_error(f"run_forever got general exception: {e} - returning True") + self.has_errored = True + return True return self.has_errored - def _run_async_loop(self): - """Run uasyncio event loop in a separate thread.""" - loop = asyncio.get_event_loop() - loop.run_until_complete(self._async_main()) - loop.run_forever() - async def _async_main(self): """Main async loop for WebSocket handling.""" reconnect = 0 # Default, as RECONNECT may not be defined @@ -227,7 +222,7 @@ class WebSocketApp: try: await self._connect_and_run() except Exception as e: - print(f"_async_main got exception {e}") + _log_error(f"_async_main got exception: {e}") self.has_errored = True _run_callback(self.on_error, self, e) if not reconnect: @@ -239,11 +234,8 @@ class WebSocketApp: # Cleanup self.running = False - if self.ws: - print("websocket.py: closing...") - await self.ws.close() - if self.session: - await self.session.__aexit__(None, None, None) + _log_debug("websocket.py: closing...") + await self._close_async() _run_callback(self.on_close, self, None, None) async def _connect_and_run(self): @@ -270,6 +262,7 @@ class WebSocketApp: raise WebSocketTimeoutException("ping/pong timed out") # Process message + _process_callbacks() # Process callbacks in same thread if msg.type == WSMsgType.TEXT: data = msg.data _run_callback(self.on_data, self, data, ABNF.OPCODE_TEXT, True)