From f32ebbd2a36970b535c1debccf0d3f2091e739c8 Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Mon, 19 May 2025 22:19:47 +0200 Subject: [PATCH] websocket works better now, the issue was an exception related to the empty queue --- internal_filesystem/lib/queue.py | 2 + internal_filesystem/lib/threading.py | 1 + internal_filesystem/lib/websocket.py | 118 ++++++++++++++++++--------- 3 files changed, 83 insertions(+), 38 deletions(-) diff --git a/internal_filesystem/lib/queue.py b/internal_filesystem/lib/queue.py index 88ba52dc..9ffe18c8 100644 --- a/internal_filesystem/lib/queue.py +++ b/internal_filesystem/lib/queue.py @@ -25,10 +25,12 @@ class Queue: with self._lock: if not self._queue: raise RuntimeError("Queue is empty") + print("queue not empty, returning one object!!!") return self._queue.pop(0) else: if not self._queue: raise RuntimeError("Queue is empty") + print("queue not empty, returning one object!!!") return self._queue.pop(0) def qsize(self): diff --git a/internal_filesystem/lib/threading.py b/internal_filesystem/lib/threading.py index e055fd71..2e72f502 100644 --- a/internal_filesystem/lib/threading.py +++ b/internal_filesystem/lib/threading.py @@ -11,6 +11,7 @@ class Thread: def start(self): # In MicroPython, _thread.start_new_thread doesn't support daemon threads directly # We store the daemon attribute for compatibility, but it may not affect termination + #_thread.stack_size(32*1024) _thread.start_new_thread(self.run, ()) def run(self): diff --git a/internal_filesystem/lib/websocket.py b/internal_filesystem/lib/websocket.py index a2ed4bc7..344c20ae 100644 --- a/internal_filesystem/lib/websocket.py +++ b/internal_filesystem/lib/websocket.py @@ -8,12 +8,12 @@ import ucollections import aiohttp from aiohttp import WSMsgType, ClientWebSocketResponse -# Simplified logging for MicroPython +# Simplified logging for MicroPython with timestamps def _log_debug(msg): - print(f"DEBUG: {msg}") + print(f"[DEBUG {time.ticks_ms()}] {msg}") def _log_error(msg): - print(f"ERROR: {msg}") + print(f"[ERROR {time.ticks_ms()}] {msg}") # Simplified ABNF for opcode compatibility class ABNF: @@ -33,31 +33,36 @@ class WebSocketConnectionClosedException(WebSocketException): class WebSocketTimeoutException(WebSocketException): pass -# Queue for callback dispatching (in same thread) +# Queue for callback dispatching _callback_queue = ucollections.deque((), 100) # Empty tuple, maxlen=100 def _run_callback(callback, *args): """Add callback to queue for execution.""" try: _callback_queue.append((callback, args)) + _log_debug(f"Queued callback {callback}, queue size: {len(_callback_queue)}") except IndexError: _log_error("Callback queue full, dropping callback") -def _process_callbacks(): - """Process queued callbacks.""" - while _callback_queue: - print("processing callbacks queue...") - try: - callback, args = _callback_queue.popleft() - if callback is not None: - try: - callback(*args) - except Exception as e: - _log_error(f"Error in callback {callback}: {e}") - else: - print("Not calling None callback") - except IndexError: - break # Queue is empty +async def _process_callbacks_async(): + """Process queued callbacks asynchronously.""" + while True: + while _callback_queue: + _log_debug("Processing callbacks queue...") + try: + callback, args = _callback_queue.popleft() + if callback is not None: + _log_debug(f"Executing callback {callback} with args {args}") + try: + callback(*args) + except Exception as e: + _log_error(f"Error in callback {callback}: {e}") + else: + _log_debug("Skipping None callback") + except IndexError: + _log_debug("Callback queue empty") + break + await asyncio.sleep(0.01) # Yield to other tasks class WebSocketApp: def __init__( @@ -108,7 +113,9 @@ class WebSocketApp: def send(self, data, opcode=ABNF.OPCODE_TEXT): """Send a message.""" if not self.ws or not self.running: + _log_error("Send failed: Connection closed or not running") raise WebSocketConnectionClosedException("Connection is already closed.") + _log_debug(f"Scheduling send: opcode={opcode}, data={str(data)[:20]}...") asyncio.create_task(self._send_async(data, opcode)) def send_text(self, text_data): @@ -121,26 +128,35 @@ class WebSocketApp: def close(self, **kwargs): """Close the WebSocket connection.""" + _log_debug("Close requested") self.running = False asyncio.create_task(self._close_async()) async def _close_async(self): """Async close implementation.""" + _log_debug("Closing WebSocket connection") try: if self.ws and not self.ws.ws.closed: + _log_debug("Sending WebSocket close frame") await self.ws.close() + else: + _log_debug("WebSocket already closed or not initialized") if self.session: + _log_debug("Closing ClientSession") await self.session.__aexit__(None, None, None) + else: + _log_debug("No ClientSession to close") except Exception as e: _log_error(f"Error closing WebSocket: {e}") - def _start_ping_thread(self): + def _start_ping_task(self): """Start ping task.""" if self.ping_interval: - asyncio.create_task(self._send_ping_async()) + _log_debug(f"NOT Starting ping task with interval {self.ping_interval}s") + #asyncio.create_task(self._send_ping_async()) def _stop_ping_thread(self): - """No-op, ping handled in async loop.""" + """No-op, ping handled in async task.""" pass async def _send_ping_async(self): @@ -148,16 +164,20 @@ class WebSocketApp: while self.running and self.ping_interval: self.last_ping_tm = time.time() try: - - #await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload) - _log_debug("NOT Sending ping because it seems corrupt") + if self.ws and not self.ws.ws.closed: + _log_debug(f"Sending ping with payload: {self.ping_payload}") + await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload) + else: + _log_debug("Skipping ping: WebSocket not connected") except Exception as e: - _log_debug(f"Failed to send ping: {e}") + _log_error(f"Failed to send ping: {e}") await asyncio.sleep(self.ping_interval) def ready(self): """Check if connection is active.""" - return self.ws is not None and self.running + status = self.ws is not None and self.running + _log_debug(f"Connection status: ready={status}") + return status def run_forever( self, @@ -180,6 +200,7 @@ class WebSocketApp: reconnect=None, ): """Run the WebSocket event loop in the main thread.""" + _log_debug("Starting run_forever") if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type: raise WebSocketException("Proxy and sockopt not supported in MicroPython") if dispatcher: @@ -200,17 +221,19 @@ class WebSocketApp: try: self._loop.run_until_complete(self._async_main()) except KeyboardInterrupt: - print("run_forever got KeyboardInterrupt") + _log_debug("run_forever got KeyboardInterrupt") self.close() return False except Exception as e: - _log_error(f"run_forever got general exception: {e} - returning True") + _log_error(f"run_forever got general exception: {e}") self.has_errored = True return True + _log_debug("run_forever completed") return self.has_errored async def _async_main(self): """Main async loop for WebSocket handling.""" + _log_debug("Starting _async_main") reconnect = 0 # Default, as RECONNECT may not be defined try: from websocket import RECONNECT @@ -219,10 +242,14 @@ class WebSocketApp: pass if reconnect is not None: reconnect = reconnect + _log_debug(f"Reconnect interval set to {reconnect}s") + + # Start callback processing task + callback_task = asyncio.create_task(_process_callbacks_async()) + _log_debug("Started callback processing task") while self.running: - print("self.running") - time.sleep(1) + _log_debug("Main loop iteration: self.running=True") try: await self._connect_and_run() except Exception as e: @@ -230,46 +257,55 @@ class WebSocketApp: self.has_errored = True _run_callback(self.on_error, self, e) if not reconnect: + _log_debug("No reconnect configured, breaking loop") break - _log_debug(f"Reconnecting after error: {e}") + _log_debug(f"Reconnecting after error in {reconnect}s") await asyncio.sleep(reconnect) if self.on_reconnect: _run_callback(self.on_reconnect, self) # Cleanup + _log_debug("Initiating cleanup") self.running = False - _log_debug("websocket.py: closing...") + callback_task.cancel() # Stop callback task + try: + await callback_task + except asyncio.CancelledError: + _log_debug("Callback task cancelled") await self._close_async() _run_callback(self.on_close, self, None, None) + _log_debug("_async_main completed") async def _connect_and_run(self): """Connect and handle WebSocket messages.""" + _log_debug(f"Connecting to {self.url}") ssl_context = None if self.url.startswith("wss://"): import ssl ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_context.verify_mode = ssl.CERT_NONE + _log_debug("Using SSL with no certificate verification") self.session = aiohttp.ClientSession(headers=self.header) async with self.session.ws_connect(self.url, ssl=ssl_context) as ws: self.ws = ws - print("running on_open callback...") + _log_debug("WebSocket connected, running on_open callback") _run_callback(self.on_open, self) - print("done running on_open callback.") - self._start_ping_thread() + self._start_ping_task() async for msg in ws: - print(f"websocket.py received msg: type {msg.type} - {msg.data[0:20]}") + _log_debug(f"Received msg: type={msg.type}, data={str(msg.data)[:30]}...") if not self.running: + _log_debug("Not running, breaking message loop") break # Handle ping/pong timeout if self.ping_timeout and self.last_ping_tm: if time.time() - self.last_ping_tm > self.ping_timeout: + _log_error("Ping/pong timed out") raise WebSocketTimeoutException("ping/pong timed out") # Process message - _process_callbacks() # Process callbacks in same thread if msg.type == WSMsgType.TEXT: data = msg.data _run_callback(self.on_data, self, data, ABNF.OPCODE_TEXT, True) @@ -279,10 +315,12 @@ class WebSocketApp: _run_callback(self.on_data, self, data, ABNF.OPCODE_BINARY, True) _run_callback(self.on_message, self, data) elif msg.type == WSMsgType.ERROR or ws.ws.closed: + _log_error("WebSocket error or closed") raise WebSocketConnectionClosedException("WebSocket closed") async def _send_async(self, data, opcode): """Async send implementation.""" + _log_debug(f"Sending: opcode={opcode}, data={str(data)[:20]}...") try: if opcode == ABNF.OPCODE_TEXT: await self.ws.send_str(data) @@ -290,7 +328,9 @@ class WebSocketApp: await self.ws.send_bytes(data) else: raise WebSocketException(f"Unsupported opcode: {opcode}") + _log_debug("Send successful") except Exception as e: + _log_error(f"Send failed: {e}") _run_callback(self.on_error, self, e) def _callback(self, callback, *args): @@ -299,8 +339,10 @@ class WebSocketApp: def _get_close_args(self, close_frame): """Extract close code and reason (simplified).""" + _log_debug("Getting close args (not supported)") return [None, None] # aiohttp doesn't provide close frame details def create_dispatcher(self, ping_timeout, dispatcher, is_ssl, handleDisconnect): """Not supported.""" + _log_error("Custom dispatcher not supported") raise WebSocketException("Custom dispatcher not supported")