weather: add forecasts, switch to download manager (#75)

* weather: add forecasts, and summarize them

* weather: Switch to download manager

We had open coded http protocol, this simplifies it and switches to https
This commit is contained in:
Pavel Machek
2026-03-11 12:36:59 +01:00
committed by GitHub
parent 0ac9d7155e
commit 5ded4b40f6
@@ -12,7 +12,7 @@ try:
except ImportError:
pass
from mpos import Activity, MposKeyboard
from mpos import Activity, MposKeyboard, DownloadManager
import ujson
import utime
@@ -55,22 +55,87 @@ class WData:
99: "Thunderstorm + hail",
}
def init(self):
pass
def code_to_text(self, code):
return self.WMO_CODES.get(int(code), "Unknown")
class Hourly(WData):
def __init__(self, cw):
self.temp = cw["temperature_2m"]
self.wind = cw["windspeed"]
self.code = self.code_to_text(cw["weather_code"])
def get(self, v, cw, ind):
if ind == None:
return cw[v]
else:
return cw[v][ind]
def full(self):
return f"{self.code}\nTemp {self.temp:.1f} dew {self.dew:.1f} pres {self.pres:1f}\n" \
f"Precip {self.precip}\nWind {self.wind} gust {self.gust}"
def short(self):
r = f"{self.code} {self.temp:.1f}°C"
if self.dew + 3 > self.temp:
r += f" dew {self.dew:.1f}°C"
if self.gust > self.wind + 5:
r += f" {self.gust:.0f} g"
elif self.wind > 10:
r += f" {self.wind:.0f} w"
# FIXME: add precip
return r
def similar(self, prev):
if self.code != prev.code:
return False
if abs(self.temp - prev.temp) > 3:
return False
if abs(self.wind - prev.wind) > 10:
return False
if abs(self.gust - prev.gust) > 10:
return False
return True
def summarize(self):
return f"{self.code}\nTemp {self.temp}\nWind {self.wind}"
return self.ftime() + self.short()
class Hourly(WData):
def init(self, cw, ind):
super().init()
self.time = None
self.temp = self.get("temperature_2m", cw, ind)
self.dew = self.get("dewpoint_2m", cw, ind)
self.pres = self.get("pressure_msl", cw, ind)
self.precip = self.get("precipitation", cw, ind)
self.wind = self.get("wind_speed_10m", cw, ind)
self.gust = self.get("wind_gusts_10m", cw, ind)
self.raw_code = self.get("weather_code", cw, ind)
self.code = self.code_to_text(self.raw_code)
def ftime(self):
if self.time:
return self.time[11:13] + "h "
return ""
class Daily(WData):
def init(self, cw, ind):
super().init()
self.temp = self.get("temperature_2m_max", cw, ind)
self.temp_min = self.get("temperature_2m_min", cw, ind)
self.dew = self.get("dewpoint_2m_max", cw, ind)
self.dew_min = self.get("dewpoint_2m_min", cw, ind)
self.pres = None
self.precip = self.get("precipitation_sum", cw, ind)
self.wind = self.get("wind_speed_10m_max", cw, ind)
self.gust = self.get("wind_gusts_10m_max", cw, ind)
self.raw_code = self.get("weather_code", cw, ind)
self.code = self.code_to_text(self.raw_code)
def ftime(self):
return self.time[8:10] + ". "
class Weather:
name = "Prague"
lat = 50.08
lon = 14.44
# LKPR airport
lat = 50 + 6/60.
lon = 14 + 15/60.
def __init__(self):
self.now = None
@@ -84,68 +149,102 @@ class Weather:
# See https://open-meteo.com/en/docs?forecast_days=1&current=relative_humidity_2m
host = "api.open-meteo.com"
port = 80 # HTTP only
path = (
"/v1/forecast?"
"latitude={}&longitude={}"
"&current=temperature_2m,dewpoint_2m,pressure_msl,precipitation,weather_code,windspeed"
"&current=temperature_2m,dewpoint_2m,pressure_msl,precipitation,weather_code,wind_speed_10m,wind_gusts_10m"
"&forecast_hours=8"
"&hourly=temperature_2m,dewpoint_2m,pressure_msl,precipitation,weather_code,wind_speed_10m,wind_gusts_10m"
"&forecast_days=10"
"&daily=temperature_2m_max,temperature_2m_min,dewpoint_2m_min,dewpoint_2m_max,pressure_msl_min,pressure_msl_max,precipitation_sum,weather_code,wind_speed_10m_max,wind_gusts_10m_max"
"&timezone=auto"
).format(self.lat, self.lon)
print("Weather fetch: ", path)
# Resolve DNS
addr = socket.getaddrinfo(host, port, socket.AF_INET)[0][-1]
print("DNS", addr)
s = socket.socket()
s.connect(addr)
# Send HTTP request
request = (
"GET {} HTTP/1.1\r\n"
"Host: {}\r\n"
"Connection: close\r\n\r\n"
).format(path, host)
s.send(request.encode())
# ---- Read response ----
# Skip HTTP headers
buffer = b""
while True:
chunk = s.recv(256)
if not chunk:
raise Exception("No response")
buffer += chunk
header_end = buffer.find(b"\r\n\r\n")
if header_end != -1:
body = buffer[header_end + 4:]
break
# Read remaining body
while True:
chunk = s.recv(512)
if not chunk:
break
body += chunk
s.close()
# Strip non-json parts
body = body[5:]
body = body[:-7]
print("Have result:", body.decode())
data = DownloadManager.download_url("https://"+host+path)
if not data:
self.summary = "Download error"
return
#print("Have result:", body.decode())
# Parse JSON
data = ujson.loads(body)
data = ujson.loads(data)
# ---- Extract data ----
print("\n\n")
s = ""
print("---- ")
cw = data["current"]
self.now = Hourly(cw)
self.summary = self.now.summarize()
self.now = Hourly()
self.now.init(cw, None)
prev = self.now
t = self.now.summarize()
s += t + "\n"
print(t)
self.hourly = []
d = data["hourly"]
times = d["time"]
#print(d)
print("---- ")
for i in range(len(times)):
h = Hourly()
h.init(d, i)
h.time = times[i]
self.hourly.append(h)
if not h.similar(prev):
t = h.summarize()
s += t + "\n"
print(t)
prev = h
self.daily = []
d = data["daily"]
times = d["time"]
#print(d)
print("---- ")
for i in range(len(times)):
h = Daily()
h.init(d, i)
h.time = times[i]
self.daily.append(h)
if i == 0:
prev = h
elif not h.similar(prev):
t = h.summarize()
s += t + "\n"
print(t)
prev = h
self.summary = s
def summarize_future():
now = utime.time()
# Rain detection in next 24h
for h in weather.hourly[:24]:
if h["precip"] >= 1.0:
return "Rain soon"
# Temperature trend
if len(weather.hourly) > 24:
t0 = weather.hourly[0]["temp"]
t24 = weather.hourly[24]["temp"]
if abs(t24 - t0) < 2:
return "No change expected"
if t24 > t0:
return "Getting warmer"
else:
return "Getting cooler"
return "Stable weather"
weather = Weather()
@@ -167,32 +266,38 @@ class Main(Activity):
# ---- MAIN SCREEN ----
label_time = lv.label(scr_main)
label_time.set_text("(time)")
label_time.align(lv.ALIGN.TOP_LEFT, 10, 40)
label_time.set_style_text_font(lv.font_montserrat_24, 0)
self.label_time = label_time
label_weather = lv.label(scr_main)
label_weather.set_text(f"Weather for {weather.name} ({weather.lat}, {weather.lon})")
label_weather.align_to(label_time, lv.ALIGN.OUT_BOTTOM_LEFT, 0, 10)
label_weather.set_text(f"{weather.name} ({weather.lat}, {weather.lon})")
label_weather.align(lv.ALIGN.TOP_LEFT, 10, 24)
label_weather.set_style_text_font(lv.font_montserrat_14, 0)
self.label_weather = label_weather
btn_hourly = lv.button(scr_main)
btn_hourly.align(lv.ALIGN.TOP_RIGHT, -5, 24)
lv.label(btn_hourly).set_text("Reload")
btn_hourly.add_event_cb(lambda x: self.do_load(), lv.EVENT.CLICKED, None)
label_time = lv.label(scr_main)
label_time.set_text("(time)")
label_time.align_to(btn_hourly, lv.ALIGN.TOP_LEFT, -85, -10)
label_time.set_style_text_font(lv.font_montserrat_24, 0)
self.label_time = label_time
label_summary = lv.label(scr_main)
label_summary.set_text("(weather)")
#label_summary.set_long_mode(lv.label.LONG.WRAP)
label_summary.set_width(300)
#label_summary.set_width(300)
label_summary.align_to(label_weather, lv.ALIGN.OUT_BOTTOM_LEFT, 0, 5)
label_summary.set_style_text_font(lv.font_montserrat_24, 0)
self.label_summary = label_summary
btn_hourly = lv.button(scr_main)
btn_hourly.set_size(100, 40)
btn_hourly.align(lv.ALIGN.BOTTOM_LEFT, 10, -10)
lv.label(btn_hourly).set_text("Reload")
btn_hourly.add_event_cb(lambda x: self.do_load(), lv.EVENT.CLICKED, None)
if False:
btn_daily = lv.button(scr_main)
btn_daily.set_size(100, 40)
btn_daily.align(lv.ALIGN.BOTTOM_RIGHT, -10, -10)
lv.label(btn_daily).set_text("Daily")
self.setContentView(self.screen)
@@ -223,3 +328,87 @@ class Main(Activity):
self.label_summary.set_text("Requesting...")
weather.fetch()
# --------------------
def code():
# -----------------------------
# LVGL UI
# -----------------------------
scr_main = lv.obj()
scr_hourly = lv.obj()
scr_daily = lv.obj()
# ---- HOURLY SCREEN ----
hourly_list = lv.list(scr_hourly)
hourly_list.set_size(320, 200)
hourly_list.align(lv.ALIGN.TOP_MID, 0, 10)
btn_back1 = lv.button(scr_hourly)
btn_back1.set_size(80, 30)
btn_back1.align(lv.ALIGN.BOTTOM_MID, 0, -5)
lv.label(btn_back1).set_text("Back")
# ---- DAILY SCREEN ----
daily_list = lv.list(scr_daily)
daily_list.set_size(320, 200)
daily_list.align(lv.ALIGN.TOP_MID, 0, 10)
btn_back2 = lv.button(scr_daily)
btn_back2.set_size(80, 30)
btn_back2.align(lv.ALIGN.BOTTOM_MID, 0, -5)
lv.label(btn_back2).set_text("Back")
def foo():
btn_hourly.add_event_cb(go_hourly, lv.EVENT.CLICKED, None)
btn_daily.add_event_cb(go_daily, lv.EVENT.CLICKED, None)
btn_back1.add_event_cb(go_back, lv.EVENT.CLICKED, None)
btn_back2.add_event_cb(go_back, lv.EVENT.CLICKED, None)
# -----------------------------
# STARTUP
# -----------------------------
def go_hourly(e):
populate_hourly()
lv.scr_load(scr_hourly)
def go_daily(e):
populate_daily()
lv.scr_load(scr_daily)
def go_back(e):
lv.scr_load(scr_main)
def update_ui():
if weather.current_temp is not None:
text = "%s %.1f C" % (
weather_code_to_text(weather.current_code),
weather.current_temp
)
label_weather.set_text(text)
label_summary.set_text(weather.summary)
def populate_hourly():
hourly_list.clean()
for h in weather.hourly[:24]:
line = "%s %.1fC %.1fmm" % (
h["time"][11:16],
h["temp"],
h["precip"]
)
hourly_list.add_text(line)
def populate_daily():
daily_list.clean()
for d in weather.daily:
line = "%s %.1f/%.1f" % (
d["date"],
d["high"],
d["low"]
)
daily_list.add_text(line)