You've already forked MicroPythonOS
mirror of
https://github.com/m5stack/MicroPythonOS.git
synced 2026-05-20 11:51:27 -07:00
137 lines
3.9 KiB
Python
137 lines
3.9 KiB
Python
import os
|
|
import socket
|
|
import uio
|
|
|
|
import _webrepl
|
|
from . import webrepl
|
|
import websocket
|
|
|
|
WEBREPL_HTML_PATH = "builtin/html/webrepl_inlined_minified.html"
|
|
'''
|
|
# Unused as these files are minified and inlined:
|
|
#WEBREPL_HTML_PATH = "/builtin/html/webrepl.html"
|
|
WEBREPL_CONTENT_PATH = "/builtin/html/webrepl.js"
|
|
WEBREPL_TERM_PATH = "/builtin/html/term.js"
|
|
WEBREPL_CSS_PATH = "/builtin/html/webrepl.css"
|
|
WEBREPL_FILE_SAVER_PATH = "/builtin/html/FileSaver.js"
|
|
'''
|
|
|
|
WEBREPL_ASSETS = {
|
|
b"/": (WEBREPL_HTML_PATH, b"text/html"),
|
|
b"/index.html": (WEBREPL_HTML_PATH, b"text/html"),
|
|
#b"/webrepl.css": (WEBREPL_CSS_PATH, b"text/css"),
|
|
#b"/webrepl.js": (WEBREPL_CONTENT_PATH, b"application/javascript"),
|
|
#b"/term.js": (WEBREPL_TERM_PATH, b"application/javascript"),
|
|
#b"/FileSaver.js": (WEBREPL_FILE_SAVER_PATH, b"application/javascript"),
|
|
}
|
|
|
|
|
|
class _MakefileSocket:
|
|
def __init__(self, sock, raw_request):
|
|
self._sock = sock
|
|
self._raw_request = raw_request
|
|
|
|
def makefile(self, *args, **kwargs):
|
|
return uio.BytesIO(self._raw_request)
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._sock, name)
|
|
|
|
|
|
def _read_http_request(cl):
|
|
req = cl.makefile("rwb", 0)
|
|
first_line = req.readline()
|
|
if not first_line:
|
|
return None, None, b""
|
|
|
|
raw_request = first_line
|
|
headers = {}
|
|
while True:
|
|
line = req.readline()
|
|
if not line:
|
|
break
|
|
raw_request += line
|
|
if line == b"\r\n":
|
|
break
|
|
if b":" in line:
|
|
key, value = line.split(b":", 1)
|
|
headers[key.strip().lower()] = value.strip().lower()
|
|
|
|
parts = first_line.split()
|
|
path = parts[1] if len(parts) >= 2 else b"/"
|
|
if b"?" in path:
|
|
path = path.split(b"?", 1)[0]
|
|
|
|
return path, headers, raw_request
|
|
|
|
|
|
def _is_websocket_request(headers):
|
|
connection = headers.get(b"connection", b"")
|
|
upgrade = headers.get(b"upgrade", b"")
|
|
return b"upgrade" in connection and upgrade == b"websocket"
|
|
|
|
|
|
def _send_response(cl, status, content_type, body):
|
|
cl.send(b"HTTP/1.0 " + status + b"\r\n")
|
|
cl.send(b"Server: MicroPythonOS\r\n")
|
|
cl.send(b"Content-Type: " + content_type + b"\r\n")
|
|
cl.send(b"Content-Length: %d\r\n\r\n" % len(body))
|
|
cl.send(body)
|
|
cl.close()
|
|
|
|
|
|
def _send_file_response(cl, path, content_type):
|
|
try:
|
|
with open(path, "rb") as handle:
|
|
body = handle.read()
|
|
except OSError:
|
|
_send_response(cl, b"404 Not Found", b"text/plain", b"Not Found")
|
|
return False
|
|
|
|
_send_response(cl, b"200 OK", content_type, body)
|
|
return False
|
|
|
|
|
|
def _start_webrepl_session(cl, remote_addr):
|
|
print("\nWebREPL connection from:", remote_addr)
|
|
webrepl.client_s = cl
|
|
|
|
ws = websocket.websocket(cl, True)
|
|
ws = _webrepl._webrepl(ws)
|
|
cl.setblocking(False)
|
|
if hasattr(os, "dupterm_notify"):
|
|
cl.setsockopt(socket.SOL_SOCKET, 20, os.dupterm_notify)
|
|
os.dupterm(ws)
|
|
|
|
return True
|
|
|
|
|
|
def accept_handler(listen_sock):
|
|
cl, remote_addr = listen_sock.accept()
|
|
print("\webrepl_http connection from:", remote_addr)
|
|
try:
|
|
path, headers, raw_request = _read_http_request(cl)
|
|
if not path:
|
|
cl.close()
|
|
return False
|
|
|
|
if _is_websocket_request(headers):
|
|
if not webrepl.server_handshake(_MakefileSocket(cl, raw_request)):
|
|
cl.close()
|
|
return False
|
|
return _start_webrepl_session(cl, remote_addr)
|
|
|
|
if path in WEBREPL_ASSETS:
|
|
asset_path, content_type = WEBREPL_ASSETS[path]
|
|
return _send_file_response(cl, asset_path, content_type)
|
|
|
|
_send_response(cl, b"404 Not Found", b"text/plain", b"Not Found")
|
|
return False
|
|
except Exception as exc:
|
|
print("webrepl_http: error handling connection:", exc)
|
|
try:
|
|
cl.close()
|
|
except Exception:
|
|
pass
|
|
return False
|