From 2f0546e0d37f8fd983825fb87d1ec5037ce35f9d Mon Sep 17 00:00:00 2001 From: Thomas Farstrike Date: Sun, 18 May 2025 17:12:58 +0200 Subject: [PATCH] websockets: less verbose --- draft_code/modwebsocket_test.py | 127 ++++++++------------------------ 1 file changed, 29 insertions(+), 98 deletions(-) diff --git a/draft_code/modwebsocket_test.py b/draft_code/modwebsocket_test.py index ec547df1..07f07567 100644 --- a/draft_code/modwebsocket_test.py +++ b/draft_code/modwebsocket_test.py @@ -2,30 +2,13 @@ import socket import ssl import ubinascii from websocket import websocket -import time import select -import gc - -# Log memory usage -def log_memory(): - gc.collect() - print('Free memory:', gc.mem_free()) - -# Connect to Wi-Fi (disabled as per your code) -if False: - wlan = network.WLAN(network.STA_IF) - wlan.active(True) - wlan.connect('your_ssid', 'your_password') - while not wlan.isconnected(): - pass - print('Connected:', wlan.ifconfig()) # Resolve hostname -# Option 1: ws.postman-echo.com (recommended for reliable echo) host = 'ws.postman-echo.com' port = 443 handshake_path = '/raw' -# Option 2: echo.websocket.events (unreliable) +# Option: echo.websocket.events (unreliable) # host = 'echo.websocket.events' # handshake_path = '/' @@ -50,16 +33,13 @@ except Exception as e: try: ssl_sock = ssl.wrap_socket(sock, server_hostname=host) print('SSL connection established') - print('SSL cipher:', ssl_sock.cipher()) except Exception as e: print('SSL wrap failed:', e) sock.close() raise -log_memory() # Set socket to non-blocking ssl_sock.setblocking(False) -print('Socket set to non-blocking') # Perform WebSocket handshake key = ubinascii.b2a_base64(b'random_bytes_here').strip() @@ -73,113 +53,64 @@ handshake = ( '\r\n' ).format(handshake_path, host, key.decode()) -# Send handshake request try: - bytes_written = ssl_sock.write(handshake.encode()) - print('Handshake sent, bytes written:', bytes_written) - print('Handshake request:', handshake) + ssl_sock.write(handshake.encode()) + print('Handshake sent') except Exception as e: print('Failed to send handshake:', e) ssl_sock.close() raise -log_memory() # Read HTTP response until \r\n\r\n response_bytes = bytearray() -max_read = 1024 -read_timeout = 5 # Increased to 5s for server response -start_time = time.time() poller = select.poll() poller.register(ssl_sock, select.POLLIN) -while len(response_bytes) < max_read: - events = poller.poll(100) # Wait 100ms - print('Poll events:', events) - if not events: - if time.time() - start_time > read_timeout: - print('Read timeout reached') - break - continue - try: +max_polls = 50 # 5s timeout (50 * 100ms) +for poll_count in range(max_polls): + if poller.poll(100): chunk = ssl_sock.read(128) - print('Read attempt, chunk:', chunk) if chunk is None: - print('Read returned None, continuing to poll') continue if not chunk: - print('No more data received (EOF)') break - print('Received chunk, length:', len(chunk), 'bytes:', chunk) response_bytes.extend(chunk) - print('Total bytes received:', len(response_bytes)) if b'\r\n\r\n' in response_bytes: - print('End of HTTP headers detected') - http_end = response_bytes.find(b'\r\n\r\n') + 4 - if http_end < 4: - ssl_sock.close() - raise Exception('Invalid HTTP response: no headers found') - http_response_bytes = response_bytes[:http_end] + http_response_bytes = response_bytes[:response_bytes.find(b'\r\n\r\n') + 4] try: response = http_response_bytes.decode('utf-8') - print('Decoded HTTP response:', response) + if '101 Switching Protocols' not in response: + raise Exception('Handshake failed') + print('Handshake successful') + break except UnicodeError as e: - print('UnicodeError during decode:', e) - printable = ''.join(c if 32 <= ord(c) < 127 else '.' for c in http_response_bytes.decode('latin-1')) - print('Printable characters:', printable) + print('UnicodeError:', e) ssl_sock.close() raise Exception('Failed to decode HTTP response') - if '101 Switching Protocols' not in response: - print('Handshake response:', response) - ssl_sock.close() - raise Exception('Handshake failed') - print('Stopping read to preserve WebSocket frame') - break - except Exception as e: - print('Error reading chunk:', e) - break -log_memory() + else: + continue +else: + ssl_sock.close() + print('Handshake timeout: No response received after {} seconds ({} bytes received)'.format(max_polls * 0.1, len(response_bytes))) + raise Exception('Handshake timeout') # Create WebSocket object -try: - ws = websocket(ssl_sock, True) - print('WebSocket object created') -except Exception as e: - print('Failed to create WebSocket object:', e) - ssl_sock.close() - raise -log_memory() +ws = websocket(ssl_sock, True) +print('WebSocket object created') -# Send and receive data with polling -try: - bytes_written = ws.write('Hello, Secure WebSocket!') - print('Sent message, bytes written:', bytes_written) -except Exception as e: - print('Failed to send message:', e) - ws.close() - raise - -# Poll for data with retries -max_attempts = 10 # Increased retries -poll_timeout = 100 # 100ms per poll -start_time = time.time() +# Send and receive data +ws.write('Hello, Secure WebSocket!') +max_attempts = 5 for attempt in range(max_attempts): - events = poller.poll(poll_timeout) - print('Read poll attempt', attempt + 1, 'events:', events) - if not events: - print('Read attempt', attempt + 1, 'no data available') - if time.time() - start_time > 2: # 2s total timeout - print('Read timeout reached') - break - continue - try: + if poller.poll(100): data = ws.read(1024) - print('Read attempt', attempt + 1, 'received:', data) if data: + print('Received:', data) break - except Exception as e: - print('Read attempt', attempt + 1, 'error:', e) - time.sleep(0.1) + else: + print('Read attempt', attempt + 1, 'no data') +else: + print('Read timeout: No response received after {} attempts ({} seconds)'.format(max_attempts, max_attempts * 0.1)) # Close connection ws.close() print('Connection closed') -log_memory()