websockets: less verbose

This commit is contained in:
Thomas Farstrike
2025-05-18 17:12:58 +02:00
parent 23598b0212
commit 2f0546e0d3
+29 -98
View File
@@ -2,30 +2,13 @@ import socket
import ssl
import ubinascii
from websocket import websocket
import time
import select
import gc
# Log memory usage
def log_memory():
gc.collect()
print('Free memory:', gc.mem_free())
# Connect to Wi-Fi (disabled as per your code)
if False:
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect('your_ssid', 'your_password')
while not wlan.isconnected():
pass
print('Connected:', wlan.ifconfig())
# Resolve hostname
# Option 1: ws.postman-echo.com (recommended for reliable echo)
host = 'ws.postman-echo.com'
port = 443
handshake_path = '/raw'
# Option 2: echo.websocket.events (unreliable)
# Option: echo.websocket.events (unreliable)
# host = 'echo.websocket.events'
# handshake_path = '/'
@@ -50,16 +33,13 @@ except Exception as e:
try:
ssl_sock = ssl.wrap_socket(sock, server_hostname=host)
print('SSL connection established')
print('SSL cipher:', ssl_sock.cipher())
except Exception as e:
print('SSL wrap failed:', e)
sock.close()
raise
log_memory()
# Set socket to non-blocking
ssl_sock.setblocking(False)
print('Socket set to non-blocking')
# Perform WebSocket handshake
key = ubinascii.b2a_base64(b'random_bytes_here').strip()
@@ -73,113 +53,64 @@ handshake = (
'\r\n'
).format(handshake_path, host, key.decode())
# Send handshake request
try:
bytes_written = ssl_sock.write(handshake.encode())
print('Handshake sent, bytes written:', bytes_written)
print('Handshake request:', handshake)
ssl_sock.write(handshake.encode())
print('Handshake sent')
except Exception as e:
print('Failed to send handshake:', e)
ssl_sock.close()
raise
log_memory()
# Read HTTP response until \r\n\r\n
response_bytes = bytearray()
max_read = 1024
read_timeout = 5 # Increased to 5s for server response
start_time = time.time()
poller = select.poll()
poller.register(ssl_sock, select.POLLIN)
while len(response_bytes) < max_read:
events = poller.poll(100) # Wait 100ms
print('Poll events:', events)
if not events:
if time.time() - start_time > read_timeout:
print('Read timeout reached')
break
continue
try:
max_polls = 50 # 5s timeout (50 * 100ms)
for poll_count in range(max_polls):
if poller.poll(100):
chunk = ssl_sock.read(128)
print('Read attempt, chunk:', chunk)
if chunk is None:
print('Read returned None, continuing to poll')
continue
if not chunk:
print('No more data received (EOF)')
break
print('Received chunk, length:', len(chunk), 'bytes:', chunk)
response_bytes.extend(chunk)
print('Total bytes received:', len(response_bytes))
if b'\r\n\r\n' in response_bytes:
print('End of HTTP headers detected')
http_end = response_bytes.find(b'\r\n\r\n') + 4
if http_end < 4:
ssl_sock.close()
raise Exception('Invalid HTTP response: no headers found')
http_response_bytes = response_bytes[:http_end]
http_response_bytes = response_bytes[:response_bytes.find(b'\r\n\r\n') + 4]
try:
response = http_response_bytes.decode('utf-8')
print('Decoded HTTP response:', response)
if '101 Switching Protocols' not in response:
raise Exception('Handshake failed')
print('Handshake successful')
break
except UnicodeError as e:
print('UnicodeError during decode:', e)
printable = ''.join(c if 32 <= ord(c) < 127 else '.' for c in http_response_bytes.decode('latin-1'))
print('Printable characters:', printable)
print('UnicodeError:', e)
ssl_sock.close()
raise Exception('Failed to decode HTTP response')
if '101 Switching Protocols' not in response:
print('Handshake response:', response)
ssl_sock.close()
raise Exception('Handshake failed')
print('Stopping read to preserve WebSocket frame')
break
except Exception as e:
print('Error reading chunk:', e)
break
log_memory()
else:
continue
else:
ssl_sock.close()
print('Handshake timeout: No response received after {} seconds ({} bytes received)'.format(max_polls * 0.1, len(response_bytes)))
raise Exception('Handshake timeout')
# Create WebSocket object
try:
ws = websocket(ssl_sock, True)
print('WebSocket object created')
except Exception as e:
print('Failed to create WebSocket object:', e)
ssl_sock.close()
raise
log_memory()
ws = websocket(ssl_sock, True)
print('WebSocket object created')
# Send and receive data with polling
try:
bytes_written = ws.write('Hello, Secure WebSocket!')
print('Sent message, bytes written:', bytes_written)
except Exception as e:
print('Failed to send message:', e)
ws.close()
raise
# Poll for data with retries
max_attempts = 10 # Increased retries
poll_timeout = 100 # 100ms per poll
start_time = time.time()
# Send and receive data
ws.write('Hello, Secure WebSocket!')
max_attempts = 5
for attempt in range(max_attempts):
events = poller.poll(poll_timeout)
print('Read poll attempt', attempt + 1, 'events:', events)
if not events:
print('Read attempt', attempt + 1, 'no data available')
if time.time() - start_time > 2: # 2s total timeout
print('Read timeout reached')
break
continue
try:
if poller.poll(100):
data = ws.read(1024)
print('Read attempt', attempt + 1, 'received:', data)
if data:
print('Received:', data)
break
except Exception as e:
print('Read attempt', attempt + 1, 'error:', e)
time.sleep(0.1)
else:
print('Read attempt', attempt + 1, 'no data')
else:
print('Read timeout: No response received after {} attempts ({} seconds)'.format(max_attempts, max_attempts * 0.1))
# Close connection
ws.close()
print('Connection closed')
log_memory()