more test code

This commit is contained in:
Thomas Farstrike
2025-05-19 15:29:57 +02:00
parent 4a27a1316d
commit 26fce3b874
3 changed files with 134 additions and 0 deletions
+69
View File
@@ -0,0 +1,69 @@
import uasyncio as asyncio
import sys
from contextlib import suppress
import aiohttp
import _thread
# Shared buffer for input lines
input_buffer = []
lock = _thread.allocate_lock() # Thread-safe access to buffer
# Thread function to read input and add to buffer
def read_input_thread():
while True:
line = input()
with lock:
input_buffer.append(line)
if line == "exit":
break
async def start_client(url: str) -> None:
name = input("Please enter your name: ")
# Start the input reading thread
_thread.start_new_thread(read_input_thread, ())
async def dispatch(ws: aiohttp.ClientWebSocketResponse) -> None:
while True:
#msg = await ws.receive()
msg = await ws.__anext__()
if msg.type is aiohttp.WSMsgType.TEXT:
print("Text: ", msg.data.strip())
elif msg.type is aiohttp.WSMsgType.BINARY:
print("Binary: ", msg.data)
elif msg.type is aiohttp.WSMsgType.PING:
await ws.pong()
elif msg.type is aiohttp.WSMsgType.PONG:
print("Pong received")
else:
if msg.type is aiohttp.WSMsgType.CLOSE:
await ws.close()
elif msg.type is aiohttp.WSMsgType.ERROR:
print("Error during receive %s" % ws.exception())
elif msg.type is aiohttp.WSMsgType.CLOSED:
pass
break
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
dispatch_task = asyncio.create_task(dispatch(ws))
# Poll the input buffer instead of to_thread
while True:
line = None
with lock:
if input_buffer: # Check if there's input
line = input_buffer.pop(0) # Get the first line
if line:
await ws.send_str(name + ": " + line)
if line == "exit": # Stop on "exit"
break
await asyncio.sleep_ms(100) # Avoid busy-waiting
dispatch_task.cancel()
with suppress(asyncio.CancelledError):
await dispatch_task
# Run the client
asyncio.run(start_client("wss://echo.websocket.events"))
@@ -0,0 +1,49 @@
# it's not super fast but it works!
import websocket
import _thread
import time
def on_message(wsapp, message):
print(f"got message: {message}")
def on_ping(wsapp, message):
print("Got a ping! A pong reply has already been automatically sent.")
def on_pong(wsapp, message):
print("Got a pong! No need to respond")
def on_error(wsapp, message):
print(f"Got error: {message}")
#wsapp = websocket.WebSocketApp("wss://testnet.binance.vision/ws/btcusdt@trade", on_message=on_message, on_ping=on_ping, on_pong=on_pong, on_error=on_error)
wsapp = websocket.WebSocketApp("wss://echo.websocket.events", on_message=on_message, on_ping=on_ping, on_pong=on_pong, on_error=on_error)
def stress_test_thread():
print("before run_forever")
wsapp.run_forever(ping_interval=15, ping_timeout=10, ping_payload="This is an optional ping payload")
print("after run_forever")
_thread.stack_size(16*1024)
_thread.start_new_thread(stress_test_thread, ())
time.sleep(5)
print("sending ok")
wsapp.send_text('ok')
time.sleep(15)
print("sending again")
wsapp.send_text('again')
time.sleep(25)
print("sending more")
wsapp.send_text('more')
wsapp.close()
@@ -0,0 +1,16 @@
from websocket import WebSocketApp
def on_message(ws, message):
print(f"Received: {message}")
def on_open(ws):
ws.send_text("Hello, Nostr!")
ws = WebSocketApp(
url="wss://relay.damus.io",
on_open=on_open,
on_message=on_message,
on_error=lambda ws, e: print(f"Error: {e}"),
on_close=lambda ws, code, reason: print("Closed")
)
ws.run_forever(ping_interval=30, ping_timeout=10)