You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
Replace aiorepl with source copy
This commit is contained in:
@@ -3,9 +3,10 @@ This /lib folder contains:
|
||||
- mip.install("aiohttp") # easy websockets
|
||||
- mip.install("collections") # used by aiohttp
|
||||
- mip.install("unittest")
|
||||
- mip.install("aiorepl")
|
||||
|
||||
- https://github.com/micropython/micropython-lib/blob/master/python-stdlib/logging/logging.py version 0.6.2 # for About app
|
||||
- https://github.com/micropython/micropython-lib/blob/master/micropython/aiorepl/aiorepl.py version 0.2.2 # for asyncio REPL, allowing await expressions
|
||||
|
||||
- https://github.com/micropython/micropython-lib/blob/master/python-stdlib/base64/base64.py version 3.3.6 # for nostr
|
||||
- https://github.com/micropython/micropython-lib/blob/master/python-stdlib/binascii/binascii.py version 2.4.1 # for base64.py
|
||||
- https://github.com/micropython/micropython-lib/blob/master/python-stdlib/logging/logging.py version 0.6.2 # for About app
|
||||
- https://github.com/micropython/micropython-lib/blob/master/python-stdlib/shutil/shutil.py version 0.0.5 # for rmtree()
|
||||
|
||||
Binary file not shown.
@@ -0,0 +1,332 @@
|
||||
# MIT license; Copyright (c) 2022 Jim Mussared
|
||||
|
||||
import micropython
|
||||
from micropython import const
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import asyncio
|
||||
|
||||
# Import statement (needs to be global, and does not return).
|
||||
_RE_IMPORT = re.compile("^import ([^ ]+)( as ([^ ]+))?")
|
||||
_RE_FROM_IMPORT = re.compile("^from [^ ]+ import ([^ ]+)( as ([^ ]+))?")
|
||||
# Global variable assignment.
|
||||
_RE_GLOBAL = re.compile("^([a-zA-Z0-9_]+) ?=[^=]")
|
||||
# General assignment expression or import statement (does not return a value).
|
||||
_RE_ASSIGN = re.compile("[^=]=[^=]")
|
||||
|
||||
# Command hist (One reserved slot for the current command).
|
||||
_HISTORY_LIMIT = const(5 + 1)
|
||||
|
||||
|
||||
CHAR_CTRL_A = const(1)
|
||||
CHAR_CTRL_B = const(2)
|
||||
CHAR_CTRL_C = const(3)
|
||||
CHAR_CTRL_D = const(4)
|
||||
CHAR_CTRL_E = const(5)
|
||||
|
||||
|
||||
async def execute(code, g, s):
|
||||
if not code.strip():
|
||||
return
|
||||
|
||||
try:
|
||||
if "await " in code:
|
||||
# Execute the code snippet in an async context.
|
||||
if m := _RE_IMPORT.match(code) or _RE_FROM_IMPORT.match(code):
|
||||
code = "global {}\n {}".format(m.group(3) or m.group(1), code)
|
||||
elif m := _RE_GLOBAL.match(code):
|
||||
code = "global {}\n {}".format(m.group(1), code)
|
||||
elif not _RE_ASSIGN.search(code):
|
||||
code = "return {}".format(code)
|
||||
|
||||
code = """
|
||||
import asyncio
|
||||
async def __code():
|
||||
{}
|
||||
|
||||
__exec_task = asyncio.create_task(__code())
|
||||
""".format(code)
|
||||
|
||||
async def kbd_intr_task(exec_task, s):
|
||||
while True:
|
||||
if ord(await s.read(1)) == CHAR_CTRL_C:
|
||||
exec_task.cancel()
|
||||
return
|
||||
|
||||
l = {"__exec_task": None}
|
||||
exec(code, g, l)
|
||||
exec_task = l["__exec_task"]
|
||||
|
||||
# Concurrently wait for either Ctrl-C from the stream or task
|
||||
# completion.
|
||||
intr_task = asyncio.create_task(kbd_intr_task(exec_task, s))
|
||||
|
||||
try:
|
||||
try:
|
||||
return await exec_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
intr_task.cancel()
|
||||
try:
|
||||
await intr_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
else:
|
||||
# Execute code snippet directly.
|
||||
try:
|
||||
try:
|
||||
micropython.kbd_intr(3)
|
||||
try:
|
||||
return eval(code, g)
|
||||
except SyntaxError:
|
||||
# Maybe an assignment, try with exec.
|
||||
return exec(code, g)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
micropython.kbd_intr(-1)
|
||||
|
||||
except Exception as err:
|
||||
print("{}: {}".format(type(err).__name__, err))
|
||||
|
||||
|
||||
# REPL task. Invoke this with an optional mutable globals dict.
|
||||
async def task(g=None, prompt="--> "):
|
||||
print("Starting asyncio REPL...")
|
||||
if g is None:
|
||||
g = __import__("__main__").__dict__
|
||||
try:
|
||||
micropython.kbd_intr(-1)
|
||||
s = asyncio.StreamReader(sys.stdin)
|
||||
# clear = True
|
||||
hist = [None] * _HISTORY_LIMIT
|
||||
hist_i = 0 # Index of most recent entry.
|
||||
hist_n = 0 # Number of history entries.
|
||||
c = 0 # ord of most recent character.
|
||||
t = 0 # timestamp of most recent character.
|
||||
while True:
|
||||
hist_b = 0 # How far back in the history are we currently.
|
||||
sys.stdout.write(prompt)
|
||||
cmd: str = ""
|
||||
paste = False
|
||||
curs = 0 # cursor offset from end of cmd buffer
|
||||
while True:
|
||||
b = await s.read(1)
|
||||
if not b: # Handle EOF/empty read
|
||||
break
|
||||
pc = c # save previous character
|
||||
c = ord(b)
|
||||
pt = t # save previous time
|
||||
t = time.ticks_ms()
|
||||
if c < 0x20 or c > 0x7E:
|
||||
if c == 0x0A:
|
||||
# LF
|
||||
if paste:
|
||||
sys.stdout.write(b)
|
||||
cmd += b
|
||||
continue
|
||||
# If the previous character was also LF, and was less
|
||||
# than 20 ms ago, this was likely due to CRLF->LFLF
|
||||
# conversion, so ignore this linefeed.
|
||||
if pc == 0x0A and time.ticks_diff(t, pt) < 20:
|
||||
continue
|
||||
if curs:
|
||||
# move cursor to end of the line
|
||||
sys.stdout.write("\x1b[{}C".format(curs))
|
||||
curs = 0
|
||||
sys.stdout.write("\n")
|
||||
if cmd:
|
||||
# Push current command.
|
||||
hist[hist_i] = cmd
|
||||
# Increase history length if possible, and rotate ring forward.
|
||||
hist_n = min(_HISTORY_LIMIT - 1, hist_n + 1)
|
||||
hist_i = (hist_i + 1) % _HISTORY_LIMIT
|
||||
|
||||
result = await execute(cmd, g, s)
|
||||
if result is not None:
|
||||
sys.stdout.write(repr(result))
|
||||
sys.stdout.write("\n")
|
||||
break
|
||||
elif c == 0x08 or c == 0x7F:
|
||||
# Backspace.
|
||||
if cmd:
|
||||
if curs:
|
||||
cmd = "".join((cmd[: -curs - 1], cmd[-curs:]))
|
||||
sys.stdout.write(
|
||||
"\x08\x1b[K"
|
||||
) # move cursor back, erase to end of line
|
||||
sys.stdout.write(cmd[-curs:]) # redraw line
|
||||
sys.stdout.write("\x1b[{}D".format(curs)) # reset cursor location
|
||||
else:
|
||||
cmd = cmd[:-1]
|
||||
sys.stdout.write("\x08 \x08")
|
||||
elif c == CHAR_CTRL_A:
|
||||
raw_repl(sys.stdin, g)
|
||||
break
|
||||
elif c == CHAR_CTRL_B:
|
||||
continue
|
||||
elif c == CHAR_CTRL_C:
|
||||
if paste:
|
||||
break
|
||||
sys.stdout.write("\n")
|
||||
break
|
||||
elif c == CHAR_CTRL_D:
|
||||
if paste:
|
||||
result = await execute(cmd, g, s)
|
||||
if result is not None:
|
||||
sys.stdout.write(repr(result))
|
||||
sys.stdout.write("\n")
|
||||
break
|
||||
|
||||
sys.stdout.write("\n")
|
||||
# Shutdown asyncio.
|
||||
asyncio.new_event_loop()
|
||||
return
|
||||
elif c == CHAR_CTRL_E:
|
||||
sys.stdout.write("paste mode; Ctrl-C to cancel, Ctrl-D to finish\n===\n")
|
||||
paste = True
|
||||
elif c == 0x1B:
|
||||
# Start of escape sequence.
|
||||
key = await s.read(2)
|
||||
if key in ("[A", "[B"): # up, down
|
||||
# Stash the current command.
|
||||
hist[(hist_i - hist_b) % _HISTORY_LIMIT] = cmd
|
||||
# Clear current command.
|
||||
b = "\x08" * len(cmd)
|
||||
sys.stdout.write(b)
|
||||
sys.stdout.write(" " * len(cmd))
|
||||
sys.stdout.write(b)
|
||||
# Go backwards or forwards in the history.
|
||||
if key == "[A":
|
||||
hist_b = min(hist_n, hist_b + 1)
|
||||
else:
|
||||
hist_b = max(0, hist_b - 1)
|
||||
# Update current command.
|
||||
cmd = hist[(hist_i - hist_b) % _HISTORY_LIMIT]
|
||||
sys.stdout.write(cmd)
|
||||
elif key == "[D": # left
|
||||
if curs < len(cmd) - 1:
|
||||
curs += 1
|
||||
sys.stdout.write("\x1b")
|
||||
sys.stdout.write(key)
|
||||
elif key == "[C": # right
|
||||
if curs:
|
||||
curs -= 1
|
||||
sys.stdout.write("\x1b")
|
||||
sys.stdout.write(key)
|
||||
elif key == "[H": # home
|
||||
pcurs = curs
|
||||
curs = len(cmd)
|
||||
sys.stdout.write("\x1b[{}D".format(curs - pcurs)) # move cursor left
|
||||
elif key == "[F": # end
|
||||
pcurs = curs
|
||||
curs = 0
|
||||
sys.stdout.write("\x1b[{}C".format(pcurs)) # move cursor right
|
||||
else:
|
||||
# sys.stdout.write("\\x")
|
||||
# sys.stdout.write(hex(c))
|
||||
pass
|
||||
else:
|
||||
if curs:
|
||||
# inserting into middle of line
|
||||
cmd = "".join((cmd[:-curs], b, cmd[-curs:]))
|
||||
sys.stdout.write(cmd[-curs - 1 :]) # redraw line to end
|
||||
sys.stdout.write("\x1b[{}D".format(curs)) # reset cursor location
|
||||
else:
|
||||
sys.stdout.write(b)
|
||||
cmd += b
|
||||
finally:
|
||||
micropython.kbd_intr(3)
|
||||
|
||||
|
||||
def raw_paste(s, window=512):
|
||||
sys.stdout.write("R\x01") # supported
|
||||
sys.stdout.write(bytearray([window & 0xFF, window >> 8, 0x01]).decode())
|
||||
eof = False
|
||||
idx = 0
|
||||
buff = bytearray(window)
|
||||
file = b""
|
||||
while not eof:
|
||||
for idx in range(window):
|
||||
b = s.read(1)
|
||||
c = ord(b)
|
||||
if c == CHAR_CTRL_C or c == CHAR_CTRL_D:
|
||||
# end of file
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
if c == CHAR_CTRL_C:
|
||||
raise KeyboardInterrupt
|
||||
file += buff[:idx]
|
||||
eof = True
|
||||
break
|
||||
buff[idx] = c
|
||||
|
||||
if not eof:
|
||||
file += buff
|
||||
sys.stdout.write("\x01") # indicate window available to host
|
||||
|
||||
return file
|
||||
|
||||
|
||||
def raw_repl(s, g: dict):
|
||||
"""
|
||||
This function is blocking to prevent other
|
||||
async tasks from writing to the stdio stream and
|
||||
breaking the raw repl session.
|
||||
"""
|
||||
heading = "raw REPL; CTRL-B to exit\n"
|
||||
line = ""
|
||||
sys.stdout.write(heading)
|
||||
|
||||
while True:
|
||||
line = ""
|
||||
sys.stdout.write(">")
|
||||
while True:
|
||||
b = s.read(1)
|
||||
c = ord(b)
|
||||
if c == CHAR_CTRL_A:
|
||||
rline = line
|
||||
line = ""
|
||||
|
||||
if len(rline) == 2 and ord(rline[0]) == CHAR_CTRL_E:
|
||||
if rline[1] == "A":
|
||||
line = raw_paste(s)
|
||||
break
|
||||
else:
|
||||
# reset raw REPL
|
||||
sys.stdout.write(heading)
|
||||
sys.stdout.write(">")
|
||||
continue
|
||||
elif c == CHAR_CTRL_B:
|
||||
# exit raw REPL
|
||||
sys.stdout.write("\n")
|
||||
return 0
|
||||
elif c == CHAR_CTRL_C:
|
||||
# clear line
|
||||
line = ""
|
||||
elif c == CHAR_CTRL_D:
|
||||
# entry finished
|
||||
# indicate reception of command
|
||||
sys.stdout.write("OK")
|
||||
break
|
||||
else:
|
||||
# let through any other raw 8-bit value
|
||||
line += b
|
||||
|
||||
if len(line) == 0:
|
||||
# Normally used to trigger soft-reset but stay in raw mode.
|
||||
# Fake it for aiorepl / mpremote.
|
||||
sys.stdout.write("Ignored: soft reboot\n")
|
||||
sys.stdout.write(heading)
|
||||
|
||||
try:
|
||||
result = exec(line, g)
|
||||
if result is not None:
|
||||
sys.stdout.write(repr(result))
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
except Exception as ex:
|
||||
print(line)
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
sys.print_exception(ex, sys.stdout)
|
||||
sys.stdout.write(chr(CHAR_CTRL_D))
|
||||
Reference in New Issue
Block a user