Files
MicroPythonOS/internal_filesystem/lib/mpos/task_manager.py
T
Thomas Farstrike 7ba45e692e TaskManager: without new thread works but blocks REPL
aiorepl (asyncio REPL) works but it's pretty limited

It's probably fine for production, but it means the user has to sys.exit()
in aiorepl before landing on the real interactive REPL, with asyncio tasks stopped.
2025-12-11 19:02:40 +01:00

48 lines
1.8 KiB
Python

import asyncio # this is the only place where asyncio is allowed to be imported - apps should not use it directly but use this TaskManager
import _thread
import mpos.apps
class TaskManager:
task_list = [] # might be good to periodically remove tasks that are done, to prevent this list from growing huge
def __init__(self):
print("TaskManager starting asyncio_thread")
# tiny stack size of 1024 is fine for tasks that do nothing
# but for real-world usage, it needs more:
#_thread.stack_size(mpos.apps.good_stack_size())
#_thread.start_new_thread(asyncio.run, (self._asyncio_thread(100), ))
asyncio.run(self._asyncio_thread(10)) # this actually works, but it blocks the real REPL (aiorepl works, but that's limited)
async def _asyncio_thread(self, ms_to_sleep):
print("asyncio_thread started")
while True:
#print("asyncio_thread tick")
await asyncio.sleep_ms(ms_to_sleep) # This delay determines how quickly new tasks can be started, so keep it below human reaction speed
print("WARNING: asyncio_thread exited, this shouldn't happen because now asyncio.create_task() won't work anymore!")
@classmethod
def create_task(cls, coroutine):
cls.task_list.append(asyncio.create_task(coroutine))
@classmethod
def list_tasks(cls):
for index, task in enumerate(cls.task_list):
print(f"task {index}: ph_key:{task.ph_key} done:{task.done()} running {task.coro}")
@staticmethod
def sleep_ms(ms):
return asyncio.sleep_ms(ms)
@staticmethod
def sleep(s):
return asyncio.sleep(s)
@staticmethod
def notify_event():
return asyncio.Event()
@staticmethod
def wait_for(awaitable, timeout):
return asyncio.wait_for(awaitable, timeout)