websocket.py: fix decue()

This commit is contained in:
Thomas Farstrike
2025-05-19 15:09:48 +02:00
parent 9ff5dfa41a
commit cf681c2796
+306
View File
@@ -0,0 +1,306 @@
# websocket.py
# MicroPython WebSocketApp implementation for python-nostr port
# Compatible with websocket-client's WebSocketApp API, using MicroPython aiohttp
import uasyncio as asyncio
import _thread
import time
import ucollections
import aiohttp
from aiohttp import WSMsgType, ClientWebSocketResponse
# Simplified logging for MicroPython
def _log_debug(msg):
print(f"DEBUG: {msg}")
def _log_error(msg):
print(f"ERROR: {msg}")
# Simplified ABNF for opcode compatibility
class ABNF:
OPCODE_TEXT = 1
OPCODE_BINARY = 2
OPCODE_CLOSE = 8
OPCODE_PING = 9
OPCODE_PONG = 10
# Exceptions
class WebSocketException(Exception):
pass
class WebSocketConnectionClosedException(WebSocketException):
pass
class WebSocketTimeoutException(WebSocketException):
pass
# Queue for cross-thread callback dispatching
_callback_queue = ucollections.deque((), 100) # Empty tuple, maxlen=100
def _run_callback(callback, *args):
"""Add callback to queue for main thread execution."""
try:
_callback_queue.append((callback, args))
except IndexError:
_log_error("Callback queue full, dropping callback")
def _process_callbacks():
"""Process queued callbacks in the main thread."""
while _callback_queue:
try:
callback, args = _callback_queue.popleft()
if callback is not None:
try:
callback(*args)
except Exception as e:
_log_error(f"Error in callback {callback}: {e}")
else:
print("Not calling None callback")
except IndexError:
break # Queue is empty
class WebSocketApp:
def __init__(
self,
url,
header=None,
on_open=None,
on_reconnect=None,
on_message=None,
on_error=None,
on_close=None,
on_ping=None,
on_pong=None,
on_cont_message=None,
keep_running=True, # Ignored for compatibility
get_mask_key=None,
cookie=None,
subprotocols=None,
on_data=None,
socket=None,
):
self.url = url
self.header = header if header is not None else {}
self.cookie = cookie
self.on_open = on_open
self.on_reconnect = on_reconnect
self.on_message = on_message
self.on_data = on_data
self.on_error = on_error
self.on_close = on_close
self.on_ping = on_ping
self.on_pong = on_pong
self.on_cont_message = on_cont_message
self.get_mask_key = get_mask_key
self.subprotocols = subprotocols
self.prepared_socket = socket # Ignored, not supported
self.ws = None
self.session = None
self.running = False
self.thread = None
self.ping_interval = 0
self.ping_timeout = None
self.ping_payload = ""
self.last_ping_tm = 0
self.last_pong_tm = 0
self.has_errored = False
def send(self, data, opcode=ABNF.OPCODE_TEXT):
"""Send a message."""
if not self.ws or not self.running:
raise WebSocketConnectionClosedException("Connection is already closed.")
# Schedule send in async loop
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self._send_async(data, opcode), loop)
def send_text(self, text_data):
"""Send UTF-8 text."""
self.send(text_data, ABNF.OPCODE_TEXT)
def send_bytes(self, data):
"""Send binary data."""
self.send(data, ABNF.OPCODE_BINARY)
def close(self, **kwargs):
"""Close the WebSocket connection."""
self.running = False
if self.ws:
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.ws.close(), loop)
if self.session:
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self.session.__aexit__(None, None, None), loop)
def _start_ping_thread(self):
"""Simulate ping/pong in async loop."""
if self.ping_interval:
loop = asyncio.get_event_loop()
asyncio.run_coroutine_threadsafe(self._send_ping_async(), loop)
def _stop_ping_thread(self):
"""No-op, ping handled in async loop."""
pass
async def _send_ping_async(self):
"""Send periodic pings."""
while self.running and self.ping_interval:
self.last_ping_tm = time.time()
try:
await self.ws.send_bytes(self.ping_payload.encode() if isinstance(self.ping_payload, str) else self.ping_payload)
_log_debug("Sending ping")
except Exception as e:
_log_debug(f"Failed to send ping: {e}")
await asyncio.sleep(self.ping_interval)
def ready(self):
"""Check if connection is active."""
return self.ws is not None and self.running
def run_forever(
self,
sockopt=None,
sslopt=None,
ping_interval=0,
ping_timeout=None,
ping_payload="",
http_proxy_host=None,
http_proxy_port=None,
http_no_proxy=None,
http_proxy_auth=None,
http_proxy_timeout=None,
skip_utf8_validation=False,
host=None,
origin=None,
dispatcher=None,
suppress_origin=False,
proxy_type=None,
reconnect=None,
):
"""Run the WebSocket event loop."""
if sockopt or http_proxy_host or http_proxy_port or http_no_proxy or http_proxy_auth or proxy_type:
raise WebSocketException("Proxy and sockopt not supported in MicroPython")
if dispatcher:
raise WebSocketException("Custom dispatcher not supported")
if ping_timeout is not None and ping_timeout <= 0:
raise WebSocketException("Ensure ping_timeout > 0")
if ping_interval is not None and ping_interval < 0:
raise WebSocketException("Ensure ping_interval >= 0")
if ping_timeout and ping_interval and ping_interval <= ping_timeout:
raise WebSocketException("Ensure ping_interval > ping_timeout")
self.ping_interval = ping_interval
self.ping_timeout = ping_timeout
self.ping_payload = ping_payload
self.running = True
# Start async event loop in a separate thread
self.thread = _thread.start_new_thread(self._run_async_loop, ())
# Main thread processes callbacks
try:
while self.running:
_process_callbacks()
time.sleep(0.01) # Yield to other tasks
except KeyboardInterrupt:
self.close()
return False
return self.has_errored
def _run_async_loop(self):
"""Run uasyncio event loop in a separate thread."""
loop = asyncio.get_event_loop()
loop.run_until_complete(self._async_main())
loop.run_forever()
async def _async_main(self):
"""Main async loop for WebSocket handling."""
reconnect = 0 # Default, as RECONNECT may not be defined
try:
from websocket import RECONNECT
reconnect = RECONNECT
except ImportError:
pass
if reconnect is not None:
reconnect = reconnect
while self.running:
try:
await self._connect_and_run()
except Exception as e:
print(f"_async_main got exception {e}")
self.has_errored = True
_run_callback(self.on_error, self, e)
if not reconnect:
break
_log_debug(f"Reconnecting after error: {e}")
await asyncio.sleep(reconnect)
if self.on_reconnect:
_run_callback(self.on_reconnect, self)
# Cleanup
self.running = False
if self.ws:
print("websocket.py: closing...")
await self.ws.close()
if self.session:
await self.session.__aexit__(None, None, None)
_run_callback(self.on_close, self, None, None)
async def _connect_and_run(self):
"""Connect and handle WebSocket messages."""
ssl_context = None
if self.url.startswith("wss://"):
import ssl
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.verify_mode = ssl.CERT_NONE
self.session = aiohttp.ClientSession(headers=self.header)
async with self.session.ws_connect(self.url, ssl=ssl_context) as ws:
self.ws = ws
_run_callback(self.on_open, self)
self._start_ping_thread()
async for msg in ws:
if not self.running:
break
# Handle ping/pong timeout
if self.ping_timeout and self.last_ping_tm:
if time.time() - self.last_ping_tm > self.ping_timeout:
raise WebSocketTimeoutException("ping/pong timed out")
# Process message
if msg.type == WSMsgType.TEXT:
data = msg.data
_run_callback(self.on_data, self, data, ABNF.OPCODE_TEXT, True)
_run_callback(self.on_message, self, data)
elif msg.type == WSMsgType.BINARY:
data = msg.data
_run_callback(self.on_data, self, data, ABNF.OPCODE_BINARY, True)
_run_callback(self.on_message, self, data)
elif msg.type == WSMsgType.ERROR or ws.ws.closed:
raise WebSocketConnectionClosedException("WebSocket closed")
async def _send_async(self, data, opcode):
"""Async send implementation."""
try:
if opcode == ABNF.OPCODE_TEXT:
await self.ws.send_str(data)
elif opcode == ABNF.OPCODE_BINARY:
await self.ws.send_bytes(data)
else:
raise WebSocketException(f"Unsupported opcode: {opcode}")
except Exception as e:
_run_callback(self.on_error, self, e)
def _callback(self, callback, *args):
"""Compatibility wrapper for callback execution."""
_run_callback(callback, self, *args)
def _get_close_args(self, close_frame):
"""Extract close code and reason (simplified)."""
return [None, None] # aiohttp doesn't provide close frame details
def create_dispatcher(self, ping_timeout, dispatcher, is_ssl, handleDisconnect):
"""Not supported."""
raise WebSocketException("Custom dispatcher not supported")