Files
MicroPythonOS/draft_code/modwebsocket_test.py
T

117 lines
3.1 KiB
Python
Raw Normal View History

2025-05-18 16:50:01 +02:00
import socket
2025-05-18 16:56:11 +02:00
import ssl
2025-05-18 16:50:01 +02:00
import ubinascii
from websocket import websocket
2025-05-18 17:12:05 +02:00
import select
2025-05-18 16:59:52 +02:00
2025-05-18 16:56:11 +02:00
# Resolve hostname
2025-05-18 17:12:05 +02:00
host = 'ws.postman-echo.com'
2025-05-18 16:56:11 +02:00
port = 443
2025-05-18 17:12:05 +02:00
handshake_path = '/raw'
2025-05-18 17:12:58 +02:00
# Option: echo.websocket.events (unreliable)
2025-05-18 17:17:35 +02:00
#host = 'echo.websocket.events'
#handshake_path = '/'
2025-05-18 17:09:34 +02:00
2025-05-18 16:56:11 +02:00
try:
addr_info = socket.getaddrinfo(host, port)[0][-1]
print('Resolved address:', addr_info)
except Exception as e:
print('DNS resolution failed:', e)
raise
2025-05-18 16:50:01 +02:00
# Create and connect socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2025-05-18 16:56:11 +02:00
try:
sock.connect(addr_info)
print('Socket connected')
except Exception as e:
print('Connect failed:', e)
sock.close()
raise
# Wrap socket with SSL
try:
ssl_sock = ssl.wrap_socket(sock, server_hostname=host)
print('SSL connection established')
except Exception as e:
print('SSL wrap failed:', e)
sock.close()
raise
2025-05-18 16:50:01 +02:00
2025-05-18 17:09:34 +02:00
# Set socket to non-blocking
ssl_sock.setblocking(False)
2025-05-18 16:50:01 +02:00
# Perform WebSocket handshake
key = ubinascii.b2a_base64(b'random_bytes_here').strip()
handshake = (
2025-05-18 17:09:34 +02:00
'GET {} HTTP/1.1\r\n'
2025-05-18 16:56:11 +02:00
'Host: {}\r\n'
2025-05-18 16:50:01 +02:00
'Upgrade: websocket\r\n'
'Connection: Upgrade\r\n'
'Sec-WebSocket-Key: {}\r\n'
'Sec-WebSocket-Version: 13\r\n'
'\r\n'
2025-05-18 17:09:34 +02:00
).format(handshake_path, host, key.decode())
2025-05-18 16:59:52 +02:00
try:
2025-05-18 17:12:58 +02:00
ssl_sock.write(handshake.encode())
print('Handshake sent')
2025-05-18 16:59:52 +02:00
except Exception as e:
print('Failed to send handshake:', e)
2025-05-18 16:56:11 +02:00
ssl_sock.close()
2025-05-18 16:59:52 +02:00
raise
2025-05-18 17:12:05 +02:00
# Read HTTP response until \r\n\r\n
2025-05-18 16:59:52 +02:00
response_bytes = bytearray()
2025-05-18 17:12:05 +02:00
poller = select.poll()
poller.register(ssl_sock, select.POLLIN)
2025-05-18 17:12:58 +02:00
max_polls = 50 # 5s timeout (50 * 100ms)
for poll_count in range(max_polls):
if poller.poll(100):
2025-05-18 16:59:52 +02:00
chunk = ssl_sock.read(128)
2025-05-18 17:12:05 +02:00
if chunk is None:
continue
2025-05-18 17:04:53 +02:00
if not chunk:
2025-05-18 16:59:52 +02:00
break
response_bytes.extend(chunk)
2025-05-18 17:04:53 +02:00
if b'\r\n\r\n' in response_bytes:
2025-05-18 17:12:58 +02:00
http_response_bytes = response_bytes[:response_bytes.find(b'\r\n\r\n') + 4]
2025-05-18 17:09:34 +02:00
try:
response = http_response_bytes.decode('utf-8')
2025-05-18 17:12:58 +02:00
if '101 Switching Protocols' not in response:
raise Exception('Handshake failed')
print('Handshake successful')
break
2025-05-18 17:09:34 +02:00
except UnicodeError as e:
2025-05-18 17:12:58 +02:00
print('UnicodeError:', e)
2025-05-18 17:09:34 +02:00
ssl_sock.close()
raise Exception('Failed to decode HTTP response')
2025-05-18 17:12:58 +02:00
else:
continue
else:
ssl_sock.close()
print('Handshake timeout: No response received after {} seconds ({} bytes received)'.format(max_polls * 0.1, len(response_bytes)))
raise Exception('Handshake timeout')
2025-05-18 16:59:52 +02:00
2025-05-18 16:50:01 +02:00
# Create WebSocket object
2025-05-18 17:12:58 +02:00
ws = websocket(ssl_sock, True)
print('WebSocket object created')
2025-05-18 16:50:01 +02:00
2025-05-18 17:12:58 +02:00
# Send and receive data
ws.write('Hello, Secure WebSocket!')
max_attempts = 5
2025-05-18 17:09:34 +02:00
for attempt in range(max_attempts):
2025-05-18 17:12:58 +02:00
if poller.poll(100):
2025-05-18 17:09:34 +02:00
data = ws.read(1024)
if data:
2025-05-18 17:12:58 +02:00
print('Received:', data)
2025-05-18 17:09:34 +02:00
break
2025-05-18 17:12:58 +02:00
else:
print('Read attempt', attempt + 1, 'no data')
else:
print('Read timeout: No response received after {} attempts ({} seconds)'.format(max_attempts, max_attempts * 0.1))
2025-05-18 16:50:01 +02:00
# Close connection
ws.close()
2025-05-18 17:09:34 +02:00
print('Connection closed')