Files
esphome/tests/script/test_check_import_time.py

192 lines
6.4 KiB
Python

"""Unit tests for script/check_import_time.py."""
from __future__ import annotations
import importlib.util
import json
import os
from pathlib import Path
import sys
from unittest.mock import patch
import pytest
# Load the script-under-test as `check_import_time` (it's a hyphenated path
# inside `script/` that mirrors the existing `determine_jobs` pattern).
script_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "script")
)
sys.path.insert(0, script_dir)
spec = importlib.util.spec_from_file_location(
"check_import_time", os.path.join(script_dir, "check_import_time.py")
)
check_import_time = importlib.util.module_from_spec(spec)
spec.loader.exec_module(check_import_time)
def _entry(name: str, self_us: int, cumulative_us: int) -> dict:
"""Build a minimal HAR entry matching `importtime_waterfall --har`."""
return {
"request": {"url": name},
"time": cumulative_us,
"timings": {"receive": self_us, "wait": cumulative_us - self_us},
}
def _har(*entries: dict) -> dict:
return {"log": {"entries": list(entries)}}
def test_root_cumulative_us_returns_time_for_root_module() -> None:
har = _har(
_entry("dep_a", 500, 500),
_entry("dep_b", 300, 300),
_entry("esphome.__main__", 100, 1000),
)
assert check_import_time.root_cumulative_us(har, "esphome.__main__") == 1000
def test_root_cumulative_us_missing_module_raises() -> None:
har = _har(_entry("something.else", 100, 100))
with pytest.raises(RuntimeError, match="No HAR entry for 'esphome.__main__'"):
check_import_time.root_cumulative_us(har, "esphome.__main__")
def test_top_offenders_ranks_by_self_time_descending() -> None:
har = _har(
_entry("small", 100, 100),
_entry("big", 5000, 5000),
_entry("medium", 2000, 2500),
)
result = check_import_time.top_offenders(har, n=10)
assert [name for name, _, _ in result] == ["big", "medium", "small"]
assert result[0] == ("big", 5000, 5000)
def test_top_offenders_respects_n_limit() -> None:
har = _har(*[_entry(f"m{i}", i * 100, i * 100) for i in range(1, 20)])
assert len(check_import_time.top_offenders(har, n=5)) == 5
def test_top_offenders_dedupes_repeat_names_keeping_first() -> None:
har = _har(
_entry("pkg", 5000, 5000),
_entry("pkg", 100, 100), # reimport later in trace
_entry("other", 1000, 1000),
)
result = check_import_time.top_offenders(har, n=10)
assert [name for name, _, _ in result] == ["pkg", "other"]
# First occurrence wins
assert ("pkg", 5000, 5000) in result
def test_format_us_switches_to_ms_at_threshold() -> None:
assert check_import_time._format_us(500) == "500us"
assert check_import_time._format_us(999) == "999us"
assert check_import_time._format_us(1000) == "1.0ms"
assert check_import_time._format_us(12345) == "12.3ms"
def test_read_write_budget_roundtrip(tmp_path: Path) -> None:
budget_path = tmp_path / "budget.json"
with patch.object(check_import_time, "BUDGET_PATH", budget_path):
assert check_import_time.read_budget() == {}
check_import_time.write_budget(cumulative_us=12345, margin_pct=20)
loaded = check_import_time.read_budget()
assert loaded["cumulative_us"] == 12345
assert loaded["margin_pct"] == 20
assert loaded["target_module"] == check_import_time.TARGET_MODULE
def test_cmd_check_passes_when_measured_within_ceiling(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
budget_path = tmp_path / "budget.json"
budget_path.write_text(
json.dumps(
{
"target_module": check_import_time.TARGET_MODULE,
"margin_pct": 15,
"cumulative_us": 100000, # 100ms
}
)
)
# Measured 90ms: inside 100ms + 15% = 115ms ceiling
har = _har(_entry(check_import_time.TARGET_MODULE, 1000, 90000))
args = type("A", (), {"har": None})()
with (
patch.object(check_import_time, "BUDGET_PATH", budget_path),
patch.object(check_import_time, "measure", return_value=har),
):
rc = check_import_time.cmd_check(args)
assert rc == 0
out = capsys.readouterr().out
assert "measured esphome.__main__:" in out
assert "budget 100.0ms" in out
def test_cmd_check_fails_when_measured_exceeds_ceiling(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
budget_path = tmp_path / "budget.json"
budget_path.write_text(
json.dumps(
{
"target_module": check_import_time.TARGET_MODULE,
"margin_pct": 15,
"cumulative_us": 100000,
}
)
)
# Measured 120ms: over 100ms + 15% = 115ms ceiling
har = _har(
_entry("offender_a", 10000, 10000),
_entry(check_import_time.TARGET_MODULE, 1000, 120000),
)
args = type("A", (), {"har": None})()
with (
patch.object(check_import_time, "BUDGET_PATH", budget_path),
patch.object(check_import_time, "measure", return_value=har),
):
rc = check_import_time.cmd_check(args)
assert rc == 1
err = capsys.readouterr().err
assert "REGRESSION" in err
assert "120.0ms" in err
assert "offender_a" in err # top offender table
def test_cmd_check_returns_2_when_budget_missing(
tmp_path: Path, capsys: pytest.CaptureFixture[str]
) -> None:
budget_path = tmp_path / "nonexistent.json"
args = type("A", (), {"har": None})()
with patch.object(check_import_time, "BUDGET_PATH", budget_path):
rc = check_import_time.cmd_check(args)
assert rc == 2
assert "missing" in capsys.readouterr().err
def test_cmd_check_writes_har_when_path_given(tmp_path: Path) -> None:
budget_path = tmp_path / "budget.json"
budget_path.write_text(
json.dumps(
{
"target_module": check_import_time.TARGET_MODULE,
"margin_pct": 15,
"cumulative_us": 100000,
}
)
)
har_path = tmp_path / "out.har"
har_text = json.dumps(_har(_entry(check_import_time.TARGET_MODULE, 1000, 80000)))
args = type("A", (), {"har": str(har_path)})()
with (
patch.object(check_import_time, "BUDGET_PATH", budget_path),
patch.object(check_import_time, "run_waterfall", return_value=har_text),
):
rc = check_import_time.cmd_check(args)
assert rc == 0
assert har_path.exists()
assert json.loads(har_path.read_text()) == json.loads(har_text)