You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
Less verbose
This commit is contained in:
@@ -72,16 +72,16 @@ def ws_handshake(host, port, path="/"):
|
||||
return ssl_sock
|
||||
|
||||
def ws_send_text(sock, message):
|
||||
print(f"Sending: {message}")
|
||||
#print(f"Sending: {message}")
|
||||
data = message.encode()
|
||||
print(f"Data: {data}")
|
||||
#print(f"Data: {data}")
|
||||
length = len(data)
|
||||
|
||||
frame = bytearray()
|
||||
frame.append(0x81) # FIN=1, opcode=0x1 (text)
|
||||
mask_bit = 0x80
|
||||
mask_key = bytearray(urandom.getrandbits(32).to_bytes(4, 'big'))
|
||||
print(f"Mask key: {mask_key}")
|
||||
#print(f"Mask key: {mask_key}")
|
||||
|
||||
if length <= 125:
|
||||
frame.append(mask_bit | length)
|
||||
@@ -97,7 +97,7 @@ def ws_send_text(sock, message):
|
||||
for i in range(len(data)):
|
||||
masked_data[i] ^= mask_key[i % 4]
|
||||
frame.extend(masked_data)
|
||||
print(f"Frame: {frame}")
|
||||
#print(f"Frame: {frame}")
|
||||
sock.write(frame)
|
||||
|
||||
def ws_read_frame(sock):
|
||||
@@ -109,7 +109,7 @@ def ws_read_frame(sock):
|
||||
if not data:
|
||||
print("Read: empty")
|
||||
return None
|
||||
print(f"Raw header: {data}")
|
||||
#print(f"Raw header: {data}")
|
||||
|
||||
if len(data) < 2:
|
||||
print("Frame too short")
|
||||
@@ -119,29 +119,29 @@ def ws_read_frame(sock):
|
||||
opcode = data[0] & 0x0F
|
||||
masked = (data[1] & 0x80) >> 7
|
||||
payload_len = data[1] & 0x7F
|
||||
print(f"FIN: {fin} Opcode: {opcode} Masked: {masked} Len: {payload_len}")
|
||||
#print(f"FIN: {fin} Opcode: {opcode} Masked: {masked} Len: {payload_len}")
|
||||
|
||||
if payload_len == 126:
|
||||
len_data = sock.read(2)
|
||||
payload_len = int.from_bytes(len_data, 'big')
|
||||
print(f"Extended len: {payload_len}")
|
||||
#print(f"Extended len: {payload_len}")
|
||||
elif payload_len == 127:
|
||||
len_data = sock.read(8)
|
||||
payload_len = int.from_bytes(len_data, 'big')
|
||||
print(f"Extended len: {payload_len}")
|
||||
#print(f"Extended len: {payload_len}")
|
||||
|
||||
mask_key = None
|
||||
if masked:
|
||||
mask_key = sock.read(4)
|
||||
print(f"Mask key: {mask_key}")
|
||||
#print(f"Mask key: {mask_key}")
|
||||
|
||||
payload = sock.read(payload_len)
|
||||
print(f"Raw payload: {payload}")
|
||||
#print(f"Raw payload: {payload}")
|
||||
if masked and mask_key:
|
||||
payload = bytearray(payload)
|
||||
for i in range(len(payload)):
|
||||
payload[i] ^= mask_key[i % 4]
|
||||
print(f"Payload: {payload}")
|
||||
#print(f"Payload: {payload}")
|
||||
|
||||
if opcode == 0x1:
|
||||
return payload.decode()
|
||||
|
||||
Reference in New Issue
Block a user