You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
256 lines
10 KiB
Python
256 lines
10 KiB
Python
import unittest
|
||
import _thread
|
||
import time
|
||
|
||
from mpos import App, PackageManager
|
||
import mpos.apps
|
||
|
||
from websocket import WebSocketApp
|
||
|
||
|
||
# demo_multiple_ws.py
|
||
import asyncio
|
||
import aiohttp
|
||
from aiohttp import WSMsgType
|
||
import logging
|
||
import sys
|
||
from typing import List
|
||
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Logging
|
||
# ----------------------------------------------------------------------
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s",
|
||
stream=sys.stdout,
|
||
)
|
||
log = logging.getLogger(__name__)
|
||
|
||
|
||
class TestTwoWebsockets(unittest.TestCase):
|
||
#class TestTwoWebsockets():
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Configuration
|
||
# ----------------------------------------------------------------------
|
||
# Change these to point to a real echo / chat server you control.
|
||
WS_URLS = [
|
||
"wss://echo.websocket.org", # public echo service (may be down)
|
||
"wss://echo.websocket.org", # duplicate on purpose – shows concurrency
|
||
"wss://echo.websocket.org",
|
||
# add more URLs here…
|
||
]
|
||
|
||
nr_connected = 0
|
||
|
||
# How many messages each connection should send before closing gracefully
|
||
MESSAGES_PER_CONNECTION = 2
|
||
STOP_AFTER = 10
|
||
|
||
# ----------------------------------------------------------------------
|
||
# One connection worker
|
||
# ----------------------------------------------------------------------
|
||
async def ws_worker(self, session: aiohttp.ClientSession, url: str, idx: int) -> None:
|
||
"""
|
||
Handles a single WebSocket connection:
|
||
* sends a few messages,
|
||
* echoes back everything it receives,
|
||
* closes when the remote end says "close" or after MESSAGES_PER_CONNECTION.
|
||
"""
|
||
try:
|
||
async with session.ws_connect(url) as ws:
|
||
log.info(f"[{idx}] Connected to {url}")
|
||
self.nr_connected += 1
|
||
|
||
# ------------------------------------------------------------------
|
||
# 1. Send a few starter messages
|
||
# ------------------------------------------------------------------
|
||
for i in range(self.MESSAGES_PER_CONNECTION):
|
||
payload = f"Hello from client #{idx} – msg {i+1}"
|
||
await ws.send_str(payload)
|
||
log.info(f"[{idx}] → {payload}")
|
||
|
||
# give the server a moment to reply
|
||
await asyncio.sleep(0.5)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 2. Echo-loop – react to incoming messages
|
||
# ------------------------------------------------------------------
|
||
msgcounter = 0
|
||
async for msg in ws:
|
||
msgcounter += 1
|
||
if msgcounter > self.STOP_AFTER:
|
||
print("Max reached, stopping...")
|
||
await ws.close()
|
||
break
|
||
if msg.type == WSMsgType.TEXT:
|
||
data: str = msg.data
|
||
log.info(f"[{idx}] ← {data}")
|
||
|
||
# Echo back (with a suffix)
|
||
reply = data + " / answer"
|
||
await ws.send_str(reply)
|
||
log.info(f"[{idx}] → {reply}")
|
||
|
||
# Close if server asks us to
|
||
if data.strip().lower() == "close cmd":
|
||
log.info(f"[{idx}] Server asked to close → closing")
|
||
await ws.close()
|
||
break
|
||
|
||
elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
|
||
log.info(f"[{idx}] Connection closed by remote")
|
||
break
|
||
|
||
elif msg.type == WSMsgType.ERROR:
|
||
log.error(f"[{idx}] WebSocket error: {ws.exception()}")
|
||
break
|
||
|
||
except asyncio.CancelledError:
|
||
log.info(f"[{idx}] Task cancelled")
|
||
raise
|
||
except Exception as exc:
|
||
log.exception(f"[{idx}] Unexpected error on {url}: {exc}")
|
||
finally:
|
||
log.info(f"[{idx}] Worker finished for {url}")
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Main entry point – creates a single ClientSession + many tasks
|
||
# ----------------------------------------------------------------------
|
||
async def main(self) -> None:
|
||
async with aiohttp.ClientSession() as session:
|
||
# Create one task per URL (they all run concurrently)
|
||
tasks = [
|
||
asyncio.create_task(self.ws_worker(session, url, idx))
|
||
for idx, url in enumerate(self.WS_URLS)
|
||
]
|
||
|
||
log.info(f"Starting {len(tasks)} concurrent WebSocket connections…")
|
||
# Wait for *all* of them to finish (or be cancelled)
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
log.info(f"All tasks stopped successfully!")
|
||
self.assertTrue(self.nr_connected, len(self.WS_URLS))
|
||
|
||
def newthread(self):
|
||
asyncio.run(self.main())
|
||
|
||
def test_it(self):
|
||
_thread.stack_size(mpos.apps.good_stack_size())
|
||
_thread.start_new_thread(self.newthread, ())
|
||
time.sleep(10)
|
||
|
||
# This demonstrates a crash when doing asyncio using different threads:
|
||
#class TestCrashingSeparateThreads(unittest.TestCase):
|
||
class TestCrashingSeparateThreads():
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Configuration
|
||
# ----------------------------------------------------------------------
|
||
# Change these to point to a real echo / chat server you control.
|
||
WS_URLS = [
|
||
"wss://echo.websocket.org", # public echo service (may be down)
|
||
"wss://echo.websocket.org", # duplicate on purpose – shows concurrency
|
||
"wss://echo.websocket.org",
|
||
# add more URLs here…
|
||
]
|
||
|
||
# How many messages each connection should send before closing gracefully
|
||
MESSAGES_PER_CONNECTION = 2
|
||
STOP_AFTER = 10
|
||
|
||
# ----------------------------------------------------------------------
|
||
# One connection worker
|
||
# ----------------------------------------------------------------------
|
||
async def ws_worker(self, session: aiohttp.ClientSession, url: str, idx: int) -> None:
|
||
"""
|
||
Handles a single WebSocket connection:
|
||
* sends a few messages,
|
||
* echoes back everything it receives,
|
||
* closes when the remote end says "close" or after MESSAGES_PER_CONNECTION.
|
||
"""
|
||
try:
|
||
async with session.ws_connect(url) as ws:
|
||
log.info(f"[{idx}] Connected to {url}")
|
||
|
||
# ------------------------------------------------------------------
|
||
# 1. Send a few starter messages
|
||
# ------------------------------------------------------------------
|
||
for i in range(self.MESSAGES_PER_CONNECTION):
|
||
payload = f"Hello from client #{idx} – msg {i+1}"
|
||
await ws.send_str(payload)
|
||
log.info(f"[{idx}] → {payload}")
|
||
|
||
# give the server a moment to reply
|
||
await asyncio.sleep(0.5)
|
||
|
||
# ------------------------------------------------------------------
|
||
# 2. Echo-loop – react to incoming messages
|
||
# ------------------------------------------------------------------
|
||
msgcounter = 0
|
||
async for msg in ws:
|
||
msgcounter += 1
|
||
if msgcounter > self.STOP_AFTER:
|
||
print("Max reached, stopping...")
|
||
await ws.close()
|
||
break
|
||
if msg.type == WSMsgType.TEXT:
|
||
data: str = msg.data
|
||
log.info(f"[{idx}] ← {data}")
|
||
|
||
# Echo back (with a suffix)
|
||
reply = data + " / answer"
|
||
await ws.send_str(reply)
|
||
log.info(f"[{idx}] → {reply}")
|
||
|
||
# Close if server asks us to
|
||
if data.strip().lower() == "close cmd":
|
||
log.info(f"[{idx}] Server asked to close → closing")
|
||
await ws.close()
|
||
break
|
||
|
||
elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
|
||
log.info(f"[{idx}] Connection closed by remote")
|
||
break
|
||
|
||
elif msg.type == WSMsgType.ERROR:
|
||
log.error(f"[{idx}] WebSocket error: {ws.exception()}")
|
||
break
|
||
|
||
except asyncio.CancelledError:
|
||
log.info(f"[{idx}] Task cancelled")
|
||
raise
|
||
except Exception as exc:
|
||
log.exception(f"[{idx}] Unexpected error on {url}: {exc}")
|
||
finally:
|
||
log.info(f"[{idx}] Worker finished for {url}")
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Main entry point – creates a single ClientSession + many tasks
|
||
# ----------------------------------------------------------------------
|
||
async def main(self) -> None:
|
||
async with aiohttp.ClientSession() as session:
|
||
# Create one task per URL (they all run concurrently)
|
||
tasks = [
|
||
asyncio.create_task(self.ws_worker(session, url, idx))
|
||
for idx, url in enumerate(self.WS_URLS)
|
||
]
|
||
|
||
log.info(f"Starting {len(tasks)} concurrent WebSocket connections…")
|
||
# Wait for *all* of them to finish (or be cancelled)
|
||
await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
async def almostmain(self, url):
|
||
async with aiohttp.ClientSession() as session:
|
||
asyncio.create_task(self.ws_worker(session, url, idx))
|
||
|
||
def newthread(self, url):
|
||
asyncio.run(self.main())
|
||
|
||
def test_it(self):
|
||
for url in self.WS_URLS:
|
||
_thread.stack_size(mpos.apps.good_stack_size())
|
||
_thread.start_new_thread(self.newthread, (url,))
|
||
time.sleep(15)
|