#!/usr/bin/env python3 """Compact, read-only NPU/OpenVINO utilization digest. Default behavior is safe for on-demand or scheduled runs: health checks plus bounded synthetic probes, one compact JSONL artifact, and no service restarts, routing changes, advisory POSTs, vector mutations, outbound sends, or private root broadening. """ from __future__ import annotations import argparse import base64 import datetime as dt import json import math import os import tempfile import time import urllib.error import urllib.parse import urllib.request import uuid import wave from dataclasses import asdict, dataclass, field from pathlib import Path from typing import Any, Callable BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") DEFAULT_OUT_DIR = Path("/home/will/.local/state/npu-utilization/digests") EMBED_URL = "http://127.0.0.1:18817/v1/embeddings" EMBED_HEALTH_URL = "http://127.0.0.1:18817/healthz" RERANK_URL = "http://127.0.0.1:18818/rerank" RERANK_HEALTH_URL = "http://127.0.0.1:18818/readyz" WHISPER_URL = "http://127.0.0.1:18816/v1/audio/transcriptions" WHISPER_HEALTH_URL = "http://127.0.0.1:18816/health" CLASSIFIER_URL = "http://127.0.0.1:18819/v1/classify" CLASSIFIER_HEALTH_URL = "http://127.0.0.1:18819/healthz" GENAI_HEALTH_URL = "http://127.0.0.1:18820/healthz" GENAI_GENERATE_URL = "http://127.0.0.1:18820/v1/generate" DOC_TRIAGE_URL = "http://127.0.0.1:18829/triage" DOC_TRIAGE_HEALTH_URL = "http://127.0.0.1:18829/healthz" RAG_ENDPOINT_HEALTH_URL = "http://127.0.0.1:18810/healthz" RAG_HEALTH_URL = "http://127.0.0.1:18814/healthz" ADVISORY_HEALTH_URL = "http://172.19.0.1:18830/healthz" @dataclass class ServiceRow: type: str = "service" service: str = "" reachable: bool = False probe_ran: bool = False proof_ok: bool | None = None calls: int = 0 items: int = 0 avg_ms: float | None = None npu_delta_us: int | None = None response_delta_us: int | None = None mode: str = "unavailable" fallbacks: int = 0 warnings: list[str] = field(default_factory=list) gate: str = "none" jobs: int | None = None events: int | None = None files: int | None = None docs: int | None = None text_len: int | None = None sample_rate: int | None = None embedding_count: int | None = None embedding_dim: int | None = None dry_run: bool | None = None suppress: int | None = None escalate: int | None = None loaded: bool | None = None allowed_roots_count: int | None = None reason: str | None = None error: str | None = None def compact_dict(obj: Any) -> dict[str, Any]: data = asdict(obj) if hasattr(obj, "__dataclass_fields__") else dict(obj) return {k: v for k, v in data.items() if v is not None and v != []} def read_busy(path: Path = BUSY_PATH) -> int | None: try: return int(path.read_text().strip()) except Exception: return None def safe_error(exc: BaseException) -> str: return type(exc).__name__ def http_get_json(url: str, timeout: float) -> tuple[int, dict[str, Any]]: try: req = urllib.request.Request(url, headers={"Accept": "application/json"}) with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read(1024 * 1024).decode("utf-8", "replace") return int(resp.status), json.loads(body or "{}") except urllib.error.HTTPError as exc: try: body = exc.read(1024 * 1024).decode("utf-8", "replace") return int(exc.code), json.loads(body or "{}") except Exception: return int(exc.code), {"error": "http_error"} except Exception as exc: return 0, {"error": safe_error(exc)} def http_post_json(url: str, payload: dict[str, Any], timeout: float) -> tuple[int, dict[str, Any]]: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json", "Accept": "application/json"}) try: with urllib.request.urlopen(req, timeout=timeout) as resp: data = resp.read(2 * 1024 * 1024).decode("utf-8", "replace") return int(resp.status), json.loads(data or "{}") except urllib.error.HTTPError as exc: try: data = exc.read(1024 * 1024).decode("utf-8", "replace") return int(exc.code), json.loads(data or "{}") except Exception: return int(exc.code), {"error": "http_error"} except Exception as exc: return 0, {"error": safe_error(exc)} def health_row(service: str, url: str, timeout: float, gate: str = "none", mode: str = "health_only") -> tuple[ServiceRow, dict[str, Any]]: status, payload = http_get_json(url, timeout) ok = status == 200 and payload.get("ok", True) is not False row = ServiceRow(service=service, reachable=ok, mode=mode if ok else "unavailable", gate=gate) if not ok: row.fallbacks = 1 row.warnings.append("unavailable") row.error = str(payload.get("error") or payload.get("ready_error") or f"http_{status}")[:80] return row, payload def measure_probe(fn: Callable[[], tuple[int, dict[str, Any]]], timeout_label: str, busy_path: Path = BUSY_PATH) -> tuple[int, dict[str, Any], float, int | None]: before = read_busy(busy_path) started = time.perf_counter() status, payload = fn() elapsed_ms = round((time.perf_counter() - started) * 1000, 3) after = read_busy(busy_path) delta = None if before is None or after is None else after - before return status, payload, elapsed_ms, delta def apply_proof(row: ServiceRow, delta: int | None) -> None: row.npu_delta_us = delta row.proof_ok = bool(delta is not None and delta > 0) if not row.proof_ok: row.fallbacks += 1 row.warnings.append("no_positive_sysfs_delta" if delta is not None else "missing_sysfs_counter") def mark_skipped_fallback(row: ServiceRow, reason: str) -> None: """Record a skipped/unloaded proof condition as a fallback. Health-only rows that are intentionally never proof probes should keep fallbacks at zero. This helper is for proof-capable rows where a bounded smoke was disabled or skipped to avoid side effects such as cold-loading. """ row.fallbacks += 1 row.warnings.append(reason) def probe_embeddings(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: row, _ = health_row("embeddings", EMBED_HEALTH_URL, timeout) if not row.reachable: return row payload = {"input": "non-private npu utilization digest probe", "model": "bge-base-en-v1.5-int8-ov"} status, data, elapsed, delta = measure_probe(lambda: post_json(EMBED_URL, payload, timeout), "embeddings", busy_path) row.probe_ran = True row.calls = 1 row.items = 1 row.avg_ms = elapsed row.mode = "NPU" row.reachable = status == 200 and "data" in data row.embedding_count = len(data.get("data", [])) if isinstance(data.get("data"), list) else 0 row.embedding_dim = data.get("embedding_dim") row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None apply_proof(row, delta) if not row.reachable: row.warnings.append("probe_http_failed") row.error = str(data.get("error") or f"http_{status}")[:80] return row def probe_rerank(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: row, _ = health_row("rerank", RERANK_HEALTH_URL, timeout) if not row.reachable: return row docs = ["Intel NPU accelerates OpenVINO inference.", "Bananas ripen on a kitchen counter."] payload = {"query": "OpenVINO NPU inference", "documents": docs, "top_k": 2, "return_documents": False} status, data, elapsed, delta = measure_probe(lambda: post_json(RERANK_URL, payload, timeout), "rerank", busy_path) row.probe_ran = True row.calls = 1 row.docs = len(docs) row.avg_ms = float(data.get("duration_ms") or elapsed) row.mode = "NPU" row.reachable = status == 200 and data.get("ok", True) is not False row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None apply_proof(row, delta) if not row.reachable: row.warnings.append("probe_http_failed") row.error = str(data.get("error") or f"http_{status}")[:80] return row def probe_classifier(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: row, _ = health_row("classifier", CLASSIFIER_HEALTH_URL, timeout, mode="dry_run") if not row.reachable: return row payload = { "id": "npu-digest-probe", "text": "Non-private cron event: backup completed successfully, no user action required.", "options": {"dry_run": True, "include_evidence": False}, } status, data, elapsed, delta = measure_probe(lambda: post_json(CLASSIFIER_URL, payload, timeout), "classifier", busy_path) row.probe_ran = True row.calls = 1 row.events = 1 row.avg_ms = elapsed row.mode = "dry_run" row.dry_run = True row.reachable = status == 200 and "error" not in data row.response_delta_us = next((data.get(k) for k in ("sysfs_npu_busy_delta_us", "npu_busy_delta_us") if isinstance(data.get(k), int)), None) raw_labels = data.get("labels") labels: dict[str, Any] = raw_labels if isinstance(raw_labels, dict) else {} raw_action = data.get("action") action: dict[str, Any] = raw_action if isinstance(raw_action, dict) else {} row.escalate = int(bool(action.get("escalate") or labels.get("action_required") or labels.get("tool_needed"))) row.suppress = int(bool(action.get("suppress") or labels.get("no_op") or labels.get("duplicate"))) row.items = len(labels) apply_proof(row, delta) if not row.reachable: row.warnings.append("probe_http_failed") row.error = str(data.get("error") or f"http_{status}")[:80] return row def write_tone_wav(path: Path, seconds: float = 0.35, sample_rate: int = 16000) -> None: frames = int(seconds * sample_rate) with wave.open(str(path), "wb") as wav: wav.setnchannels(1) wav.setsampwidth(2) wav.setframerate(sample_rate) for i in range(frames): value = int(9000 * math.sin(2 * math.pi * 440 * (i / sample_rate))) wav.writeframesraw(value.to_bytes(2, byteorder="little", signed=True)) def post_multipart_file(url: str, file_path: Path, timeout: float) -> tuple[int, dict[str, Any]]: boundary = "----npu-digest-" + uuid.uuid4().hex file_bytes = file_path.read_bytes() parts = [ f"--{boundary}\r\nContent-Disposition: form-data; name=\"model\"\r\n\r\nwhisper\r\n".encode(), f"--{boundary}\r\nContent-Disposition: form-data; name=\"response_format\"\r\n\r\njson\r\n".encode(), f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"npu-digest.wav\"\r\nContent-Type: audio/wav\r\n\r\n".encode(), file_bytes, f"\r\n--{boundary}--\r\n".encode(), ] req = urllib.request.Request(url, data=b"".join(parts), headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}) try: with urllib.request.urlopen(req, timeout=timeout) as resp: return int(resp.status), json.loads(resp.read(1024 * 1024).decode("utf-8", "replace") or "{}") except Exception as exc: return 0, {"error": safe_error(exc)} def probe_whisper(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH) -> ServiceRow: row, _ = health_row("whisper", WHISPER_HEALTH_URL, timeout) row.jobs = 0 if not row.reachable or not include_smoke: if row.reachable: row.mode = "health_only" row.reason = "smoke_disabled" mark_skipped_fallback(row, "skipped") return row with tempfile.TemporaryDirectory(prefix="npu-digest-whisper-") as tmp: wav_path = Path(tmp) / "probe.wav" write_tone_wav(wav_path) status, data, elapsed, delta = measure_probe(lambda: post_multipart_file(WHISPER_URL, wav_path, timeout), "whisper", busy_path) row.probe_ran = True row.calls = 1 row.jobs = 1 row.avg_ms = elapsed row.mode = "NPU" row.reachable = status == 200 and "error" not in data row.text_len = len(str(data.get("text") or "")) row.sample_rate = data.get("sample_rate") if isinstance(data.get("sample_rate"), int) else None row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None apply_proof(row, delta) if not row.reachable: row.warnings.append("probe_http_failed") row.error = str(data.get("error") or f"http_{status}")[:80] return row def probe_genai(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: row, health = health_row("genai", GENAI_HEALTH_URL, timeout) row.loaded = bool(health.get("loaded")) if isinstance(health, dict) and "loaded" in health else None row.jobs = 0 if not row.reachable: return row if not include_smoke or row.loaded is False: row.mode = "loaded=false" if row.loaded is False else "health_only" row.reason = "skipped_cold_load" if row.loaded is False else "smoke_disabled" mark_skipped_fallback(row, row.reason) return row payload = {"prompt": "Say pong.", "max_new_tokens": 8} status, data, elapsed, delta = measure_probe(lambda: post_json(GENAI_GENERATE_URL, payload, timeout), "genai", busy_path) row.probe_ran = True row.calls = 1 row.jobs = 1 row.avg_ms = elapsed row.mode = "NPU" row.reachable = status == 200 and "error" not in data apply_proof(row, delta) return row def doc_triage_sample_path() -> Path | None: candidates = [ Path("/home/will/lab/swarm/openvino-doc-image-triage-npu/samples/synthetic_invoice.png"), Path(__file__).resolve().parents[1] / "openvino-doc-image-triage-npu" / "samples" / "synthetic_invoice.png", ] for candidate in candidates: if candidate.exists() and candidate.with_suffix(".png.txt").exists(): return candidate return None def probe_doc_triage(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow: row, _ = health_row("doc_triage", DOC_TRIAGE_HEALTH_URL, timeout, gate="closed:private-root") row.files = 0 if not row.reachable or not include_smoke: if row.reachable: row.mode = "health_only" row.reason = "smoke_disabled" mark_skipped_fallback(row, "skipped") return row sample = doc_triage_sample_path() if sample is not None: root = sample.parent.resolve() payload = {"path": str(sample), "options": {"allowed_roots": [str(root)], "include_ocr_text": False, "use_embeddings": True}} status, data, elapsed, delta = measure_probe(lambda: post_json(DOC_TRIAGE_URL, payload, timeout), "doc_triage", busy_path) else: with tempfile.TemporaryDirectory(prefix="npu-digest-doc-") as tmp: root = Path(tmp).resolve() sample = root / "synthetic-invoice.png" sample.write_bytes(base64.b64decode("iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+/p9sAAAAASUVORK5CYII=")) sample.with_suffix(".png.txt").write_text("Synthetic invoice. Amount due $12.34 by 2026-06-30. No private data.\n") payload = {"path": str(sample), "options": {"allowed_roots": [str(root)], "include_ocr_text": False, "use_embeddings": True}} status, data, elapsed, delta = measure_probe(lambda: post_json(DOC_TRIAGE_URL, payload, timeout), "doc_triage", busy_path) row.probe_ran = True row.calls = 1 row.files = 1 row.avg_ms = elapsed row.mode = "NPU-via-embedding-service" row.allowed_roots_count = 1 row.reachable = status == 200 and data.get("ok", True) is not False raw_result = data.get("result") result: dict[str, Any] = raw_result if isinstance(raw_result, dict) else {} raw_pages = result.get("pages") pages: list[Any] = raw_pages if isinstance(raw_pages, list) else [] embedding: dict[str, Any] = {} if pages and isinstance(pages[0], dict): raw_attn = pages[0].get("needs_attention") attn: dict[str, Any] = raw_attn if isinstance(raw_attn, dict) else {} raw_embedding = attn.get("embedding") embedding = raw_embedding if isinstance(raw_embedding, dict) else {} row.response_delta_us = embedding.get("npu_busy_delta_us") if isinstance(embedding.get("npu_busy_delta_us"), int) else None apply_proof(row, delta) if not row.reachable: row.warnings.append("probe_http_failed") row.error = str(data.get("error") or f"http_{status}")[:80] return row def build_summary(rows: list[ServiceRow], artifact_path: str | None, counter_delta_us: int | None, started_at: str) -> dict[str, Any]: services_ok = sum(1 for r in rows if r.reachable) proof_rows = [r for r in rows if r.probe_ran and r.proof_ok is not None] proof_ok = sum(1 for r in proof_rows if r.proof_ok) gates_closed = sum(1 for r in rows if str(r.gate).startswith("closed:")) fallbacks = sum(r.fallbacks for r in rows) warnings: dict[str, int] = {} for row in rows: for warning in row.warnings: warnings[warning] = warnings.get(warning, 0) + 1 return { "type": "summary", "timestamp": started_at, "counter": str(BUSY_PATH), "delta_us": counter_delta_us, "services_ok": services_ok, "services_total": len(rows), "proof_ok": proof_ok, "proof_total": len(proof_rows), "fallbacks": fallbacks, "gates_closed": gates_closed, "warnings": warnings, "artifact": artifact_path, } def render_text(summary: dict[str, Any], rows: list[ServiceRow]) -> str: lines = [ f"NPU utilization digest {summary['timestamp']}", f"counter={summary['counter']} delta_us={summary.get('delta_us')}", f"services_ok={summary['services_ok']}/{summary['services_total']} proof_ok={summary['proof_ok']}/{summary['proof_total']} fallbacks={summary['fallbacks']} gates_closed={summary['gates_closed']}", ] for r in rows: parts = [f"- {r.service}:", f"ok={str(r.reachable).lower()}"] if r.calls: parts.append(f"calls={r.calls}") if r.jobs is not None: parts.append(f"jobs={r.jobs}") if r.events is not None: parts.append(f"events={r.events}") if r.files is not None: parts.append(f"files={r.files}") if r.docs is not None: parts.append(f"docs={r.docs}") if r.avg_ms is not None: parts.append(f"avg_ms={r.avg_ms}") if r.npu_delta_us is not None: parts.append(f"npu_delta_us={r.npu_delta_us}") if r.proof_ok is not None: parts.append(f"proof={str(r.proof_ok).lower()}") if r.dry_run is not None: parts.append(f"dry_run={str(r.dry_run).lower()}") if r.suppress is not None: parts.append(f"suppress={r.suppress}") if r.escalate is not None: parts.append(f"escalate={r.escalate}") if r.loaded is not None: parts.append(f"loaded={str(r.loaded).lower()}") if r.allowed_roots_count is not None: parts.append(f"allowed_roots={r.allowed_roots_count}") if r.text_len is not None: parts.append(f"text_len={r.text_len}") if r.mode: parts.append(f"mode={r.mode}") if r.gate != "none": parts.append(f"gate={r.gate}") if r.reason: parts.append(f"reason={r.reason}") if r.warnings: parts.append("warnings=" + ",".join(sorted(set(r.warnings)))) lines.append(" ".join(parts)) warning_counts = summary.get("warnings") or {} lines.append("fallbacks: " + " ".join(f"{k}={v}" for k, v in sorted(warning_counts.items())) if warning_counts else "fallbacks: none") if summary.get("artifact"): lines.append(f"artifact: {summary['artifact']}") return "\n".join(lines) def write_jsonl(summary: dict[str, Any], rows: list[ServiceRow], out_dir: Path) -> Path: out_dir.mkdir(parents=True, exist_ok=True) stamp = summary["timestamp"].replace(":", "").replace("+", "").replace("-", "") path = out_dir / f"{stamp}.jsonl" with path.open("w", encoding="utf-8") as f: f.write(json.dumps(summary, sort_keys=True, separators=(",", ":")) + "\n") for row in rows: f.write(json.dumps(compact_dict(row), sort_keys=True, separators=(",", ":")) + "\n") return path def str_bool(value: str) -> bool: lowered = value.lower() if lowered in {"1", "true", "yes", "y", "on"}: return True if lowered in {"0", "false", "no", "n", "off"}: return False raise argparse.ArgumentTypeError("expected true or false") def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="Compact NPU utilization digest") parser.add_argument("--format", choices=("text", "jsonl"), default="text") parser.add_argument("--out", default=str(DEFAULT_OUT_DIR)) parser.add_argument("--timeout-s", type=float, default=8.0) parser.add_argument("--include-whisper-smoke", type=str_bool, default=True) parser.add_argument("--include-genai-smoke", type=str_bool, default=False) parser.add_argument("--include-doc-triage-smoke", type=str_bool, default=True) parser.add_argument("--no-write", action="store_true") parser.add_argument("--strict-proof", action="store_true", help="exit nonzero if a proof-required probe ran without positive sysfs delta") parser.add_argument("--verbose", action="store_true") return parser.parse_args(argv) def run(args: argparse.Namespace) -> tuple[dict[str, Any], list[ServiceRow]]: started_at = dt.datetime.now().astimezone().replace(microsecond=0).isoformat() before_all = read_busy(BUSY_PATH) rows = [ probe_embeddings(args.timeout_s), probe_rerank(args.timeout_s), probe_whisper(args.timeout_s, args.include_whisper_smoke), probe_classifier(args.timeout_s), probe_genai(args.timeout_s, args.include_genai_smoke), probe_doc_triage(args.timeout_s, args.include_doc_triage_smoke), ] rows.append(health_row("rag_endpoint", RAG_ENDPOINT_HEALTH_URL, args.timeout_s, gate="closed:vector-mutation")[0]) rows.append(health_row("rag_health", RAG_HEALTH_URL, args.timeout_s)[0]) rows.append(health_row("advisory_gateway", ADVISORY_HEALTH_URL, args.timeout_s, gate="closed:advisory-post")[0]) after_all = read_busy(BUSY_PATH) delta_all = None if before_all is None or after_all is None else after_all - before_all summary = build_summary(rows, artifact_path=None, counter_delta_us=delta_all, started_at=started_at) return summary, rows def main(argv: list[str] | None = None) -> int: args = parse_args(argv) summary, rows = run(args) if not args.no_write: artifact = write_jsonl(summary, rows, Path(args.out).expanduser()) summary["artifact"] = str(artifact) # rewrite with artifact path included in the summary line artifact.write_text("\n".join([json.dumps(summary, sort_keys=True, separators=(",", ":"))] + [json.dumps(compact_dict(r), sort_keys=True, separators=(",", ":")) for r in rows]) + "\n") if args.format == "jsonl": print(json.dumps(summary, sort_keys=True, separators=(",", ":"))) for row in rows: print(json.dumps(compact_dict(row), sort_keys=True, separators=(",", ":"))) else: print(render_text(summary, rows)) if args.strict_proof and any(r.probe_ran and r.proof_ok is False for r in rows): return 2 return 0 if __name__ == "__main__": raise SystemExit(main())