diff --git a/docs/npu-utilization-digest.md b/docs/npu-utilization-digest.md new file mode 100644 index 0000000..195e1b7 --- /dev/null +++ b/docs/npu-utilization-digest.md @@ -0,0 +1,49 @@ +# NPU utilization digest + +Compact on-demand observability for Will's local OpenVINO/NPU specialists. + +Script: + +```bash +/home/will/lab/swarm/scripts/npu-utilization-digest.py --format text +``` + +Safe defaults: + +- read-only for services; no service starts/stops/restarts, routing changes, vector DB mutation, advisory POSTs, outbound sends, or memory writes; +- writes only a compact JSONL artifact under `/home/will/.local/state/npu-utilization/digests` unless `--no-write` is passed; +- uses synthetic/non-private requests for embeddings, rerank, classifier dry-run, and doc triage; +- keeps GenAI generation disabled by default when the worker is not loaded, to avoid cold-load side effects; +- advisory gateway remains health-only because POSTs write metadata/events; +- NPU proof is only true when an inference probe ran and `/sys/class/accel/accel0/device/npu_busy_time_us` increased around that probe. + +Common commands: + +```bash +# Compact CLI digest, plus JSONL artifact. +scripts/npu-utilization-digest.py --format text + +# No artifact write; useful during reviews. +scripts/npu-utilization-digest.py --no-write --include-genai-smoke false + +# Machine-readable stdout. +scripts/npu-utilization-digest.py --format jsonl --no-write + +# CI/unit tests; live services not required. +python -m pytest tests/test_npu_utilization_digest.py -q +``` + +Output shape is intentionally small: service booleans, counts, average probe ms, sysfs deltas, proof flags, fallback warning counts, artifact path, and closed gates. `fallbacks` includes unavailable services, failed/missing proof, and skipped proof-capable smokes such as disabled Whisper/doc-triage probes or GenAI cold-load skips; intentionally health-only RAG/advisory rows are not fallbacks unless unavailable. It does not print raw embeddings, transcripts, OCR text, model completions, request headers, or full upstream JSON. + +Covered rows: + +- `embeddings`: `/v1/embeddings` synthetic string, positive sysfs delta required. +- `rerank`: `/rerank` with two synthetic docs, positive sysfs delta required. +- `whisper`: health-only unless the bounded generated-WAV smoke is enabled. +- `classifier`: `/v1/classify` with `dry_run=true` and `include_evidence=false`, positive sysfs delta required. +- `genai`: health-only by default; skips when `loaded=false` unless explicitly opted in. +- `doc_triage`: one approved synthetic sample under the service sample root, with `allowed_roots` narrowed to that sample directory; NPU proof is via embeddings. +- `rag_endpoint` and `rag_health`: health-only; no vector mutation. +- `advisory_gateway`: health-only; `closed:advisory-post` gate remains closed. + +Closed gates left for later approval: sending/delivery, recurring timer, GenAI cold-load smoke, advisory POSTs, Atlas/Hermes routing changes, vector mutation/reindex, and broad private document/audio/image roots. diff --git a/scripts/npu-utilization-digest.py b/scripts/npu-utilization-digest.py new file mode 100755 index 0000000..72942de --- /dev/null +++ b/scripts/npu-utilization-digest.py @@ -0,0 +1,536 @@ +#!/usr/bin/env python3 +"""Compact, read-only NPU/OpenVINO utilization digest. + +Default behavior is safe for on-demand or scheduled runs: health checks plus +bounded synthetic probes, one compact JSONL artifact, and no service restarts, +routing changes, advisory POSTs, vector mutations, outbound sends, or private +root broadening. +""" +from __future__ import annotations + +import argparse +import base64 +import datetime as dt +import json +import math +import os +import tempfile +import time +import urllib.error +import urllib.parse +import urllib.request +import uuid +import wave +from dataclasses import asdict, dataclass, field +from pathlib import Path +from typing import Any, Callable + +BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") +DEFAULT_OUT_DIR = Path("/home/will/.local/state/npu-utilization/digests") + +EMBED_URL = "http://127.0.0.1:18817/v1/embeddings" +EMBED_HEALTH_URL = "http://127.0.0.1:18817/healthz" +RERANK_URL = "http://127.0.0.1:18818/rerank" +RERANK_HEALTH_URL = "http://127.0.0.1:18818/readyz" +WHISPER_URL = "http://127.0.0.1:18816/v1/audio/transcriptions" +WHISPER_HEALTH_URL = "http://127.0.0.1:18816/health" +CLASSIFIER_URL = "http://127.0.0.1:18819/v1/classify" +CLASSIFIER_HEALTH_URL = "http://127.0.0.1:18819/healthz" +GENAI_HEALTH_URL = "http://127.0.0.1:18820/healthz" +GENAI_GENERATE_URL = "http://127.0.0.1:18820/v1/generate" +DOC_TRIAGE_URL = "http://127.0.0.1:18829/triage" +DOC_TRIAGE_HEALTH_URL = "http://127.0.0.1:18829/healthz" +RAG_ENDPOINT_HEALTH_URL = "http://127.0.0.1:18810/healthz" +RAG_HEALTH_URL = "http://127.0.0.1:18814/healthz" +ADVISORY_HEALTH_URL = "http://172.19.0.1:18830/healthz" + + +@dataclass +class ServiceRow: + type: str = "service" + service: str = "" + reachable: bool = False + probe_ran: bool = False + proof_ok: bool | None = None + calls: int = 0 + items: int = 0 + avg_ms: float | None = None + npu_delta_us: int | None = None + response_delta_us: int | None = None + mode: str = "unavailable" + fallbacks: int = 0 + warnings: list[str] = field(default_factory=list) + gate: str = "none" + jobs: int | None = None + events: int | None = None + files: int | None = None + docs: int | None = None + text_len: int | None = None + sample_rate: int | None = None + embedding_count: int | None = None + embedding_dim: int | None = None + dry_run: bool | None = None + suppress: int | None = None + escalate: int | None = None + loaded: bool | None = None + allowed_roots_count: int | None = None + reason: str | None = None + error: str | None = None + + +def compact_dict(obj: Any) -> dict[str, Any]: + data = asdict(obj) if hasattr(obj, "__dataclass_fields__") else dict(obj) + return {k: v for k, v in data.items() if v is not None and v != []} + + +def read_busy(path: Path = BUSY_PATH) -> int | None: + try: + return int(path.read_text().strip()) + except Exception: + return None + + +def safe_error(exc: BaseException) -> str: + return type(exc).__name__ + + +def http_get_json(url: str, timeout: float) -> tuple[int, dict[str, Any]]: + try: + req = urllib.request.Request(url, headers={"Accept": "application/json"}) + with urllib.request.urlopen(req, timeout=timeout) as resp: + body = resp.read(1024 * 1024).decode("utf-8", "replace") + return int(resp.status), json.loads(body or "{}") + except urllib.error.HTTPError as exc: + try: + body = exc.read(1024 * 1024).decode("utf-8", "replace") + return int(exc.code), json.loads(body or "{}") + except Exception: + return int(exc.code), {"error": "http_error"} + except Exception as exc: + return 0, {"error": safe_error(exc)} + + +def http_post_json(url: str, payload: dict[str, Any], timeout: float) -> tuple[int, dict[str, Any]]: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json", "Accept": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + data = resp.read(2 * 1024 * 1024).decode("utf-8", "replace") + return int(resp.status), json.loads(data or "{}") + except urllib.error.HTTPError as exc: + try: + data = exc.read(1024 * 1024).decode("utf-8", "replace") + return int(exc.code), json.loads(data or "{}") + except Exception: + return int(exc.code), {"error": "http_error"} + except Exception as exc: + return 0, {"error": safe_error(exc)} + + +def health_row(service: str, url: str, timeout: float, gate: str = "none", mode: str = "health_only") -> tuple[ServiceRow, dict[str, Any]]: + status, payload = http_get_json(url, timeout) + ok = status == 200 and payload.get("ok", True) is not False + row = ServiceRow(service=service, reachable=ok, mode=mode if ok else "unavailable", gate=gate) + if not ok: + row.fallbacks = 1 + row.warnings.append("unavailable") + row.error = str(payload.get("error") or payload.get("ready_error") or f"http_{status}")[:80] + return row, payload + + +def measure_probe(fn: Callable[[], tuple[int, dict[str, Any]]], timeout_label: str, busy_path: Path = BUSY_PATH) -> tuple[int, dict[str, Any], float, int | None]: + before = read_busy(busy_path) + started = time.perf_counter() + status, payload = fn() + elapsed_ms = round((time.perf_counter() - started) * 1000, 3) + after = read_busy(busy_path) + delta = None if before is None or after is None else after - before + return status, payload, elapsed_ms, delta + + +def apply_proof(row: ServiceRow, delta: int | None) -> None: + row.npu_delta_us = delta + row.proof_ok = bool(delta is not None and delta > 0) + if not row.proof_ok: + row.fallbacks += 1 + row.warnings.append("no_positive_sysfs_delta" if delta is not None else "missing_sysfs_counter") + + +def mark_skipped_fallback(row: ServiceRow, reason: str) -> None: + """Record a skipped/unloaded proof condition as a fallback. + + Health-only rows that are intentionally never proof probes should keep + fallbacks at zero. This helper is for proof-capable rows where a bounded + smoke was disabled or skipped to avoid side effects such as cold-loading. + """ + row.fallbacks += 1 + row.warnings.append(reason) + + +def probe_embeddings(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: + row, _ = health_row("embeddings", EMBED_HEALTH_URL, timeout) + if not row.reachable: + return row + payload = {"input": "non-private npu utilization digest probe", "model": "bge-base-en-v1.5-int8-ov"} + status, data, elapsed, delta = measure_probe(lambda: post_json(EMBED_URL, payload, timeout), "embeddings", busy_path) + row.probe_ran = True + row.calls = 1 + row.items = 1 + row.avg_ms = elapsed + row.mode = "NPU" + row.reachable = status == 200 and "data" in data + row.embedding_count = len(data.get("data", [])) if isinstance(data.get("data"), list) else 0 + row.embedding_dim = data.get("embedding_dim") + row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None + apply_proof(row, delta) + if not row.reachable: + row.warnings.append("probe_http_failed") + row.error = str(data.get("error") or f"http_{status}")[:80] + return row + + +def probe_rerank(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: + row, _ = health_row("rerank", RERANK_HEALTH_URL, timeout) + if not row.reachable: + return row + docs = ["Intel NPU accelerates OpenVINO inference.", "Bananas ripen on a kitchen counter."] + payload = {"query": "OpenVINO NPU inference", "documents": docs, "top_k": 2, "return_documents": False} + status, data, elapsed, delta = measure_probe(lambda: post_json(RERANK_URL, payload, timeout), "rerank", busy_path) + row.probe_ran = True + row.calls = 1 + row.docs = len(docs) + row.avg_ms = float(data.get("duration_ms") or elapsed) + row.mode = "NPU" + row.reachable = status == 200 and data.get("ok", True) is not False + row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None + apply_proof(row, delta) + if not row.reachable: + row.warnings.append("probe_http_failed") + row.error = str(data.get("error") or f"http_{status}")[:80] + return row + + +def probe_classifier(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: + row, _ = health_row("classifier", CLASSIFIER_HEALTH_URL, timeout, mode="dry_run") + if not row.reachable: + return row + payload = { + "id": "npu-digest-probe", + "text": "Non-private cron event: backup completed successfully, no user action required.", + "options": {"dry_run": True, "include_evidence": False}, + } + status, data, elapsed, delta = measure_probe(lambda: post_json(CLASSIFIER_URL, payload, timeout), "classifier", busy_path) + row.probe_ran = True + row.calls = 1 + row.events = 1 + row.avg_ms = elapsed + row.mode = "dry_run" + row.dry_run = True + row.reachable = status == 200 and "error" not in data + row.response_delta_us = next((data.get(k) for k in ("sysfs_npu_busy_delta_us", "npu_busy_delta_us") if isinstance(data.get(k), int)), None) + raw_labels = data.get("labels") + labels: dict[str, Any] = raw_labels if isinstance(raw_labels, dict) else {} + raw_action = data.get("action") + action: dict[str, Any] = raw_action if isinstance(raw_action, dict) else {} + row.escalate = int(bool(action.get("escalate") or labels.get("action_required") or labels.get("tool_needed"))) + row.suppress = int(bool(action.get("suppress") or labels.get("no_op") or labels.get("duplicate"))) + row.items = len(labels) + apply_proof(row, delta) + if not row.reachable: + row.warnings.append("probe_http_failed") + row.error = str(data.get("error") or f"http_{status}")[:80] + return row + + +def write_tone_wav(path: Path, seconds: float = 0.35, sample_rate: int = 16000) -> None: + frames = int(seconds * sample_rate) + with wave.open(str(path), "wb") as wav: + wav.setnchannels(1) + wav.setsampwidth(2) + wav.setframerate(sample_rate) + for i in range(frames): + value = int(9000 * math.sin(2 * math.pi * 440 * (i / sample_rate))) + wav.writeframesraw(value.to_bytes(2, byteorder="little", signed=True)) + + +def post_multipart_file(url: str, file_path: Path, timeout: float) -> tuple[int, dict[str, Any]]: + boundary = "----npu-digest-" + uuid.uuid4().hex + file_bytes = file_path.read_bytes() + parts = [ + f"--{boundary}\r\nContent-Disposition: form-data; name=\"model\"\r\n\r\nwhisper\r\n".encode(), + f"--{boundary}\r\nContent-Disposition: form-data; name=\"response_format\"\r\n\r\njson\r\n".encode(), + f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"npu-digest.wav\"\r\nContent-Type: audio/wav\r\n\r\n".encode(), + file_bytes, + f"\r\n--{boundary}--\r\n".encode(), + ] + req = urllib.request.Request(url, data=b"".join(parts), headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return int(resp.status), json.loads(resp.read(1024 * 1024).decode("utf-8", "replace") or "{}") + except Exception as exc: + return 0, {"error": safe_error(exc)} + + +def probe_whisper(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH) -> ServiceRow: + row, _ = health_row("whisper", WHISPER_HEALTH_URL, timeout) + row.jobs = 0 + if not row.reachable or not include_smoke: + if row.reachable: + row.mode = "health_only" + row.reason = "smoke_disabled" + mark_skipped_fallback(row, "skipped") + return row + with tempfile.TemporaryDirectory(prefix="npu-digest-whisper-") as tmp: + wav_path = Path(tmp) / "probe.wav" + write_tone_wav(wav_path) + status, data, elapsed, delta = measure_probe(lambda: post_multipart_file(WHISPER_URL, wav_path, timeout), "whisper", busy_path) + row.probe_ran = True + row.calls = 1 + row.jobs = 1 + row.avg_ms = elapsed + row.mode = "NPU" + row.reachable = status == 200 and "error" not in data + row.text_len = len(str(data.get("text") or "")) + row.sample_rate = data.get("sample_rate") if isinstance(data.get("sample_rate"), int) else None + row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None + apply_proof(row, delta) + if not row.reachable: + row.warnings.append("probe_http_failed") + row.error = str(data.get("error") or f"http_{status}")[:80] + return row + + +def probe_genai(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: + row, health = health_row("genai", GENAI_HEALTH_URL, timeout) + row.loaded = bool(health.get("loaded")) if isinstance(health, dict) and "loaded" in health else None + row.jobs = 0 + if not row.reachable: + return row + if not include_smoke or row.loaded is False: + row.mode = "loaded=false" if row.loaded is False else "health_only" + row.reason = "skipped_cold_load" if row.loaded is False else "smoke_disabled" + mark_skipped_fallback(row, row.reason) + return row + payload = {"prompt": "Say pong.", "max_new_tokens": 8} + status, data, elapsed, delta = measure_probe(lambda: post_json(GENAI_GENERATE_URL, payload, timeout), "genai", busy_path) + row.probe_ran = True + row.calls = 1 + row.jobs = 1 + row.avg_ms = elapsed + row.mode = "NPU" + row.reachable = status == 200 and "error" not in data + apply_proof(row, delta) + return row + + +def doc_triage_sample_path() -> Path | None: + candidates = [ + Path("/home/will/lab/swarm/openvino-doc-image-triage-npu/samples/synthetic_invoice.png"), + Path(__file__).resolve().parents[1] / "openvino-doc-image-triage-npu" / "samples" / "synthetic_invoice.png", + ] + for candidate in candidates: + if candidate.exists() and candidate.with_suffix(".png.txt").exists(): + return candidate + return None + + +def probe_doc_triage(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: + row, _ = health_row("doc_triage", DOC_TRIAGE_HEALTH_URL, timeout, gate="closed:private-root") + row.files = 0 + if not row.reachable or not include_smoke: + if row.reachable: + row.mode = "health_only" + row.reason = "smoke_disabled" + mark_skipped_fallback(row, "skipped") + return row + sample = doc_triage_sample_path() + if sample is not None: + root = sample.parent.resolve() + payload = {"path": str(sample), "options": {"allowed_roots": [str(root)], "include_ocr_text": False, "use_embeddings": True}} + status, data, elapsed, delta = measure_probe(lambda: post_json(DOC_TRIAGE_URL, payload, timeout), "doc_triage", busy_path) + else: + with tempfile.TemporaryDirectory(prefix="npu-digest-doc-") as tmp: + root = Path(tmp).resolve() + sample = root / "synthetic-invoice.png" + sample.write_bytes(base64.b64decode("iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+/p9sAAAAASUVORK5CYII=")) + sample.with_suffix(".png.txt").write_text("Synthetic invoice. Amount due $12.34 by 2026-06-30. No private data.\n") + payload = {"path": str(sample), "options": {"allowed_roots": [str(root)], "include_ocr_text": False, "use_embeddings": True}} + status, data, elapsed, delta = measure_probe(lambda: post_json(DOC_TRIAGE_URL, payload, timeout), "doc_triage", busy_path) + row.probe_ran = True + row.calls = 1 + row.files = 1 + row.avg_ms = elapsed + row.mode = "NPU-via-embedding-service" + row.allowed_roots_count = 1 + row.reachable = status == 200 and data.get("ok", True) is not False + raw_result = data.get("result") + result: dict[str, Any] = raw_result if isinstance(raw_result, dict) else {} + raw_pages = result.get("pages") + pages: list[Any] = raw_pages if isinstance(raw_pages, list) else [] + embedding: dict[str, Any] = {} + if pages and isinstance(pages[0], dict): + raw_attn = pages[0].get("needs_attention") + attn: dict[str, Any] = raw_attn if isinstance(raw_attn, dict) else {} + raw_embedding = attn.get("embedding") + embedding = raw_embedding if isinstance(raw_embedding, dict) else {} + row.response_delta_us = embedding.get("npu_busy_delta_us") if isinstance(embedding.get("npu_busy_delta_us"), int) else None + apply_proof(row, delta) + if not row.reachable: + row.warnings.append("probe_http_failed") + row.error = str(data.get("error") or f"http_{status}")[:80] + return row + + +def build_summary(rows: list[ServiceRow], artifact_path: str | None, counter_delta_us: int | None, started_at: str) -> dict[str, Any]: + services_ok = sum(1 for r in rows if r.reachable) + proof_rows = [r for r in rows if r.probe_ran and r.proof_ok is not None] + proof_ok = sum(1 for r in proof_rows if r.proof_ok) + gates_closed = sum(1 for r in rows if str(r.gate).startswith("closed:")) + fallbacks = sum(r.fallbacks for r in rows) + warnings: dict[str, int] = {} + for row in rows: + for warning in row.warnings: + warnings[warning] = warnings.get(warning, 0) + 1 + return { + "type": "summary", + "timestamp": started_at, + "counter": str(BUSY_PATH), + "delta_us": counter_delta_us, + "services_ok": services_ok, + "services_total": len(rows), + "proof_ok": proof_ok, + "proof_total": len(proof_rows), + "fallbacks": fallbacks, + "gates_closed": gates_closed, + "warnings": warnings, + "artifact": artifact_path, + } + + +def render_text(summary: dict[str, Any], rows: list[ServiceRow]) -> str: + lines = [ + f"NPU utilization digest {summary['timestamp']}", + f"counter={summary['counter']} delta_us={summary.get('delta_us')}", + f"services_ok={summary['services_ok']}/{summary['services_total']} proof_ok={summary['proof_ok']}/{summary['proof_total']} fallbacks={summary['fallbacks']} gates_closed={summary['gates_closed']}", + ] + for r in rows: + parts = [f"- {r.service}:", f"ok={str(r.reachable).lower()}"] + if r.calls: + parts.append(f"calls={r.calls}") + if r.jobs is not None: + parts.append(f"jobs={r.jobs}") + if r.events is not None: + parts.append(f"events={r.events}") + if r.files is not None: + parts.append(f"files={r.files}") + if r.docs is not None: + parts.append(f"docs={r.docs}") + if r.avg_ms is not None: + parts.append(f"avg_ms={r.avg_ms}") + if r.npu_delta_us is not None: + parts.append(f"npu_delta_us={r.npu_delta_us}") + if r.proof_ok is not None: + parts.append(f"proof={str(r.proof_ok).lower()}") + if r.dry_run is not None: + parts.append(f"dry_run={str(r.dry_run).lower()}") + if r.suppress is not None: + parts.append(f"suppress={r.suppress}") + if r.escalate is not None: + parts.append(f"escalate={r.escalate}") + if r.loaded is not None: + parts.append(f"loaded={str(r.loaded).lower()}") + if r.allowed_roots_count is not None: + parts.append(f"allowed_roots={r.allowed_roots_count}") + if r.text_len is not None: + parts.append(f"text_len={r.text_len}") + if r.mode: + parts.append(f"mode={r.mode}") + if r.gate != "none": + parts.append(f"gate={r.gate}") + if r.reason: + parts.append(f"reason={r.reason}") + if r.warnings: + parts.append("warnings=" + ",".join(sorted(set(r.warnings)))) + lines.append(" ".join(parts)) + warning_counts = summary.get("warnings") or {} + lines.append("fallbacks: " + " ".join(f"{k}={v}" for k, v in sorted(warning_counts.items())) if warning_counts else "fallbacks: none") + if summary.get("artifact"): + lines.append(f"artifact: {summary['artifact']}") + return "\n".join(lines) + + +def write_jsonl(summary: dict[str, Any], rows: list[ServiceRow], out_dir: Path) -> Path: + out_dir.mkdir(parents=True, exist_ok=True) + stamp = summary["timestamp"].replace(":", "").replace("+", "").replace("-", "") + path = out_dir / f"{stamp}.jsonl" + with path.open("w", encoding="utf-8") as f: + f.write(json.dumps(summary, sort_keys=True, separators=(",", ":")) + "\n") + for row in rows: + f.write(json.dumps(compact_dict(row), sort_keys=True, separators=(",", ":")) + "\n") + return path + + +def str_bool(value: str) -> bool: + lowered = value.lower() + if lowered in {"1", "true", "yes", "y", "on"}: + return True + if lowered in {"0", "false", "no", "n", "off"}: + return False + raise argparse.ArgumentTypeError("expected true or false") + + +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Compact NPU utilization digest") + parser.add_argument("--format", choices=("text", "jsonl"), default="text") + parser.add_argument("--out", default=str(DEFAULT_OUT_DIR)) + parser.add_argument("--timeout-s", type=float, default=8.0) + parser.add_argument("--include-whisper-smoke", type=str_bool, default=True) + parser.add_argument("--include-genai-smoke", type=str_bool, default=False) + parser.add_argument("--include-doc-triage-smoke", type=str_bool, default=True) + parser.add_argument("--no-write", action="store_true") + parser.add_argument("--strict-proof", action="store_true", help="exit nonzero if a proof-required probe ran without positive sysfs delta") + parser.add_argument("--verbose", action="store_true") + return parser.parse_args(argv) + + +def run(args: argparse.Namespace) -> tuple[dict[str, Any], list[ServiceRow]]: + started_at = dt.datetime.now().astimezone().replace(microsecond=0).isoformat() + before_all = read_busy(BUSY_PATH) + rows = [ + probe_embeddings(args.timeout_s), + probe_rerank(args.timeout_s), + probe_whisper(args.timeout_s, args.include_whisper_smoke), + probe_classifier(args.timeout_s), + probe_genai(args.timeout_s, args.include_genai_smoke), + probe_doc_triage(args.timeout_s, args.include_doc_triage_smoke), + ] + rows.append(health_row("rag_endpoint", RAG_ENDPOINT_HEALTH_URL, args.timeout_s, gate="closed:vector-mutation")[0]) + rows.append(health_row("rag_health", RAG_HEALTH_URL, args.timeout_s)[0]) + rows.append(health_row("advisory_gateway", ADVISORY_HEALTH_URL, args.timeout_s, gate="closed:advisory-post")[0]) + after_all = read_busy(BUSY_PATH) + delta_all = None if before_all is None or after_all is None else after_all - before_all + summary = build_summary(rows, artifact_path=None, counter_delta_us=delta_all, started_at=started_at) + return summary, rows + + +def main(argv: list[str] | None = None) -> int: + args = parse_args(argv) + summary, rows = run(args) + if not args.no_write: + artifact = write_jsonl(summary, rows, Path(args.out).expanduser()) + summary["artifact"] = str(artifact) + # rewrite with artifact path included in the summary line + artifact.write_text("\n".join([json.dumps(summary, sort_keys=True, separators=(",", ":"))] + [json.dumps(compact_dict(r), sort_keys=True, separators=(",", ":")) for r in rows]) + "\n") + if args.format == "jsonl": + print(json.dumps(summary, sort_keys=True, separators=(",", ":"))) + for row in rows: + print(json.dumps(compact_dict(row), sort_keys=True, separators=(",", ":"))) + else: + print(render_text(summary, rows)) + if args.strict_proof and any(r.probe_ran and r.proof_ok is False for r in rows): + return 2 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_npu_utilization_digest.py b/tests/test_npu_utilization_digest.py new file mode 100644 index 0000000..a284fbd --- /dev/null +++ b/tests/test_npu_utilization_digest.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import importlib.util +import json +import sys +from pathlib import Path + +SCRIPT = Path(__file__).resolve().parents[1] / "scripts" / "npu-utilization-digest.py" +spec = importlib.util.spec_from_file_location("npu_utilization_digest", SCRIPT) +assert spec and spec.loader +digest = importlib.util.module_from_spec(spec) +sys.modules[spec.name] = digest +spec.loader.exec_module(digest) + + +def test_positive_sysfs_delta_required_for_proof(tmp_path, monkeypatch): + busy = tmp_path / "busy" + busy.write_text("100") + + def fake_health(service, url, timeout, gate="none", mode="health_only"): + return digest.ServiceRow(service=service, reachable=True, mode=mode, gate=gate), {"ok": True} + + def fake_post(url, payload, timeout): + busy.write_text("100") + return 200, {"data": [{"embedding": [0.1, 0.2]}], "embedding_dim": 2, "npu_busy_delta_us": 7} + + monkeypatch.setattr(digest, "health_row", fake_health) + row = digest.probe_embeddings(1, busy_path=busy, post_json=fake_post) + assert row.probe_ran is True + assert row.proof_ok is False + assert "no_positive_sysfs_delta" in row.warnings + + +def test_embeddings_row_redacts_vectors(tmp_path): + row = digest.ServiceRow( + service="embeddings", + reachable=True, + probe_ran=True, + proof_ok=True, + calls=1, + items=1, + avg_ms=12.3, + npu_delta_us=5, + embedding_count=1, + embedding_dim=3, + mode="NPU", + ) + summary = digest.build_summary([row], None, 5, "2026-06-05T14:20:00-07:00") + text = digest.render_text(summary, [row]) + assert "embedding_count" not in text # counts are intentionally terse in text + assert "0.1" not in text + out = digest.write_jsonl(summary, [row], tmp_path) + body = out.read_text() + assert "embedding" in body # compact metadata key is okay + assert "[0.1" not in body + assert "embedding_dim" in body + + +def test_classifier_dry_run_payload(tmp_path, monkeypatch): + busy = tmp_path / "busy" + busy.write_text("10") + seen = {} + + def fake_health(service, url, timeout, gate="none", mode="health_only"): + return digest.ServiceRow(service=service, reachable=True, mode=mode, gate=gate), {"ok": True} + + def fake_post(url, payload, timeout): + seen.update(payload) + busy.write_text("35") + return 200, {"labels": {"tool_needed": True, "duplicate": False}, "npu_busy_delta_us": 25} + + monkeypatch.setattr(digest, "health_row", fake_health) + row = digest.probe_classifier(1, busy_path=busy, post_json=fake_post) + assert seen["options"]["dry_run"] is True + assert seen["options"]["include_evidence"] is False + assert row.escalate == 1 + assert row.suppress == 0 + assert row.proof_ok is True + + +def test_doc_triage_allowed_root_compact(tmp_path): + row = digest.ServiceRow( + service="doc_triage", + reachable=True, + probe_ran=True, + proof_ok=True, + files=1, + avg_ms=9, + npu_delta_us=11, + allowed_roots_count=1, + mode="NPU-via-embedding-service", + gate="closed:private-root", + ) + summary = digest.build_summary([row], None, 11, "2026-06-05T14:20:00-07:00") + text = digest.render_text(summary, [row]) + assert "allowed_roots=1" in text + assert str(tmp_path) not in text + + +def test_advisory_gateway_health_only(monkeypatch): + calls = [] + + def fake_get(url, timeout): + return 200, {"ok": True} + + def fake_post(url, payload, timeout): + calls.append((url, payload)) + return 200, {} + + monkeypatch.setattr(digest, "http_get_json", fake_get) + monkeypatch.setattr(digest, "http_post_json", fake_post) + row, _ = digest.health_row("advisory_gateway", digest.ADVISORY_HEALTH_URL, 1, gate="closed:advisory-post") + assert row.reachable is True + assert row.probe_ran is False + assert row.mode == "health_only" + assert calls == [] + + +def test_genai_loaded_false_skips_default_smoke(monkeypatch): + def fake_health(service, url, timeout, gate="none", mode="health_only"): + return digest.ServiceRow(service=service, reachable=True, mode=mode), {"ok": True, "loaded": False} + + monkeypatch.setattr(digest, "health_row", fake_health) + row = digest.probe_genai(1, include_smoke=False) + assert row.probe_ran is False + assert row.loaded is False + assert row.reason == "skipped_cold_load" + assert row.fallbacks == 1 + assert "skipped_cold_load" in row.warnings + + +def test_disabled_proof_smokes_count_as_fallbacks(monkeypatch): + def fake_health(service, url, timeout, gate="none", mode="health_only"): + return digest.ServiceRow(service=service, reachable=True, mode=mode, gate=gate), {"ok": True, "loaded": True} + + monkeypatch.setattr(digest, "health_row", fake_health) + rows = [ + digest.probe_whisper(1, include_smoke=False), + digest.probe_genai(1, include_smoke=False), + digest.probe_doc_triage(1, include_smoke=False), + ] + summary = digest.build_summary(rows, None, 0, "2026-06-05T14:20:00-07:00") + assert summary["fallbacks"] == 3 + assert summary["warnings"] == {"skipped": 2, "smoke_disabled": 1} + + +def test_jsonl_shape(tmp_path): + rows = [digest.ServiceRow(service="embeddings", reachable=True, probe_ran=True, proof_ok=True, npu_delta_us=1)] + summary = digest.build_summary(rows, None, 1, "2026-06-05T14:20:00-07:00") + path = digest.write_jsonl(summary, rows, tmp_path) + lines = [json.loads(line) for line in path.read_text().splitlines()] + assert lines[0]["type"] == "summary" + assert lines[1]["type"] == "service" + assert lines[1]["service"] == "embeddings" + + +def test_exit_codes(monkeypatch): + rows = [digest.ServiceRow(service="embeddings", reachable=True, probe_ran=True, proof_ok=False, warnings=["no_positive_sysfs_delta"])] + summary = digest.build_summary(rows, None, 0, "2026-06-05T14:20:00-07:00") + monkeypatch.setattr(digest, "run", lambda args: (summary, rows)) + assert digest.main(["--no-write"]) == 0 + assert digest.main(["--no-write", "--strict-proof"]) == 2