Files
swarm-master/scripts/npu-utilization-digest.py
T
2026-06-05 15:52:43 -07:00

537 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""Compact, read-only NPU/OpenVINO utilization digest.
Default behavior is safe for on-demand or scheduled runs: health checks plus
bounded synthetic probes, one compact JSONL artifact, and no service restarts,
routing changes, advisory POSTs, vector mutations, outbound sends, or private
root broadening.
"""
from __future__ import annotations
import argparse
import base64
import datetime as dt
import json
import math
import os
import tempfile
import time
import urllib.error
import urllib.parse
import urllib.request
import uuid
import wave
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Callable
BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
DEFAULT_OUT_DIR = Path("/home/will/.local/state/npu-utilization/digests")
EMBED_URL = "http://127.0.0.1:18817/v1/embeddings"
EMBED_HEALTH_URL = "http://127.0.0.1:18817/healthz"
RERANK_URL = "http://127.0.0.1:18818/rerank"
RERANK_HEALTH_URL = "http://127.0.0.1:18818/readyz"
WHISPER_URL = "http://127.0.0.1:18816/v1/audio/transcriptions"
WHISPER_HEALTH_URL = "http://127.0.0.1:18816/health"
CLASSIFIER_URL = "http://127.0.0.1:18819/v1/classify"
CLASSIFIER_HEALTH_URL = "http://127.0.0.1:18819/healthz"
GENAI_HEALTH_URL = "http://127.0.0.1:18820/healthz"
GENAI_GENERATE_URL = "http://127.0.0.1:18820/v1/generate"
DOC_TRIAGE_URL = "http://127.0.0.1:18829/triage"
DOC_TRIAGE_HEALTH_URL = "http://127.0.0.1:18829/healthz"
RAG_ENDPOINT_HEALTH_URL = "http://127.0.0.1:18810/healthz"
RAG_HEALTH_URL = "http://127.0.0.1:18814/healthz"
ADVISORY_HEALTH_URL = "http://172.19.0.1:18830/healthz"
@dataclass
class ServiceRow:
type: str = "service"
service: str = ""
reachable: bool = False
probe_ran: bool = False
proof_ok: bool | None = None
calls: int = 0
items: int = 0
avg_ms: float | None = None
npu_delta_us: int | None = None
response_delta_us: int | None = None
mode: str = "unavailable"
fallbacks: int = 0
warnings: list[str] = field(default_factory=list)
gate: str = "none"
jobs: int | None = None
events: int | None = None
files: int | None = None
docs: int | None = None
text_len: int | None = None
sample_rate: int | None = None
embedding_count: int | None = None
embedding_dim: int | None = None
dry_run: bool | None = None
suppress: int | None = None
escalate: int | None = None
loaded: bool | None = None
allowed_roots_count: int | None = None
reason: str | None = None
error: str | None = None
def compact_dict(obj: Any) -> dict[str, Any]:
data = asdict(obj) if hasattr(obj, "__dataclass_fields__") else dict(obj)
return {k: v for k, v in data.items() if v is not None and v != []}
def read_busy(path: Path = BUSY_PATH) -> int | None:
try:
return int(path.read_text().strip())
except Exception:
return None
def safe_error(exc: BaseException) -> str:
return type(exc).__name__
def http_get_json(url: str, timeout: float) -> tuple[int, dict[str, Any]]:
try:
req = urllib.request.Request(url, headers={"Accept": "application/json"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read(1024 * 1024).decode("utf-8", "replace")
return int(resp.status), json.loads(body or "{}")
except urllib.error.HTTPError as exc:
try:
body = exc.read(1024 * 1024).decode("utf-8", "replace")
return int(exc.code), json.loads(body or "{}")
except Exception:
return int(exc.code), {"error": "http_error"}
except Exception as exc:
return 0, {"error": safe_error(exc)}
def http_post_json(url: str, payload: dict[str, Any], timeout: float) -> tuple[int, dict[str, Any]]:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json", "Accept": "application/json"})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = resp.read(2 * 1024 * 1024).decode("utf-8", "replace")
return int(resp.status), json.loads(data or "{}")
except urllib.error.HTTPError as exc:
try:
data = exc.read(1024 * 1024).decode("utf-8", "replace")
return int(exc.code), json.loads(data or "{}")
except Exception:
return int(exc.code), {"error": "http_error"}
except Exception as exc:
return 0, {"error": safe_error(exc)}
def health_row(service: str, url: str, timeout: float, gate: str = "none", mode: str = "health_only") -> tuple[ServiceRow, dict[str, Any]]:
status, payload = http_get_json(url, timeout)
ok = status == 200 and payload.get("ok", True) is not False
row = ServiceRow(service=service, reachable=ok, mode=mode if ok else "unavailable", gate=gate)
if not ok:
row.fallbacks = 1
row.warnings.append("unavailable")
row.error = str(payload.get("error") or payload.get("ready_error") or f"http_{status}")[:80]
return row, payload
def measure_probe(fn: Callable[[], tuple[int, dict[str, Any]]], timeout_label: str, busy_path: Path = BUSY_PATH) -> tuple[int, dict[str, Any], float, int | None]:
before = read_busy(busy_path)
started = time.perf_counter()
status, payload = fn()
elapsed_ms = round((time.perf_counter() - started) * 1000, 3)
after = read_busy(busy_path)
delta = None if before is None or after is None else after - before
return status, payload, elapsed_ms, delta
def apply_proof(row: ServiceRow, delta: int | None) -> None:
row.npu_delta_us = delta
row.proof_ok = bool(delta is not None and delta > 0)
if not row.proof_ok:
row.fallbacks += 1
row.warnings.append("no_positive_sysfs_delta" if delta is not None else "missing_sysfs_counter")
def mark_skipped_fallback(row: ServiceRow, reason: str) -> None:
"""Record a skipped/unloaded proof condition as a fallback.
Health-only rows that are intentionally never proof probes should keep
fallbacks at zero. This helper is for proof-capable rows where a bounded
smoke was disabled or skipped to avoid side effects such as cold-loading.
"""
row.fallbacks += 1
row.warnings.append(reason)
def probe_embeddings(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow:
row, _ = health_row("embeddings", EMBED_HEALTH_URL, timeout)
if not row.reachable:
return row
payload = {"input": "non-private npu utilization digest probe", "model": "bge-base-en-v1.5-int8-ov"}
status, data, elapsed, delta = measure_probe(lambda: post_json(EMBED_URL, payload, timeout), "embeddings", busy_path)
row.probe_ran = True
row.calls = 1
row.items = 1
row.avg_ms = elapsed
row.mode = "NPU"
row.reachable = status == 200 and "data" in data
row.embedding_count = len(data.get("data", [])) if isinstance(data.get("data"), list) else 0
row.embedding_dim = data.get("embedding_dim")
row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None
apply_proof(row, delta)
if not row.reachable:
row.warnings.append("probe_http_failed")
row.error = str(data.get("error") or f"http_{status}")[:80]
return row
def probe_rerank(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow:
row, _ = health_row("rerank", RERANK_HEALTH_URL, timeout)
if not row.reachable:
return row
docs = ["Intel NPU accelerates OpenVINO inference.", "Bananas ripen on a kitchen counter."]
payload = {"query": "OpenVINO NPU inference", "documents": docs, "top_k": 2, "return_documents": False}
status, data, elapsed, delta = measure_probe(lambda: post_json(RERANK_URL, payload, timeout), "rerank", busy_path)
row.probe_ran = True
row.calls = 1
row.docs = len(docs)
row.avg_ms = float(data.get("duration_ms") or elapsed)
row.mode = "NPU"
row.reachable = status == 200 and data.get("ok", True) is not False
row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None
apply_proof(row, delta)
if not row.reachable:
row.warnings.append("probe_http_failed")
row.error = str(data.get("error") or f"http_{status}")[:80]
return row
def probe_classifier(timeout: float, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow:
row, _ = health_row("classifier", CLASSIFIER_HEALTH_URL, timeout, mode="dry_run")
if not row.reachable:
return row
payload = {
"id": "npu-digest-probe",
"text": "Non-private cron event: backup completed successfully, no user action required.",
"options": {"dry_run": True, "include_evidence": False},
}
status, data, elapsed, delta = measure_probe(lambda: post_json(CLASSIFIER_URL, payload, timeout), "classifier", busy_path)
row.probe_ran = True
row.calls = 1
row.events = 1
row.avg_ms = elapsed
row.mode = "dry_run"
row.dry_run = True
row.reachable = status == 200 and "error" not in data
row.response_delta_us = next((data.get(k) for k in ("sysfs_npu_busy_delta_us", "npu_busy_delta_us") if isinstance(data.get(k), int)), None)
raw_labels = data.get("labels")
labels: dict[str, Any] = raw_labels if isinstance(raw_labels, dict) else {}
raw_action = data.get("action")
action: dict[str, Any] = raw_action if isinstance(raw_action, dict) else {}
row.escalate = int(bool(action.get("escalate") or labels.get("action_required") or labels.get("tool_needed")))
row.suppress = int(bool(action.get("suppress") or labels.get("no_op") or labels.get("duplicate")))
row.items = len(labels)
apply_proof(row, delta)
if not row.reachable:
row.warnings.append("probe_http_failed")
row.error = str(data.get("error") or f"http_{status}")[:80]
return row
def write_tone_wav(path: Path, seconds: float = 0.35, sample_rate: int = 16000) -> None:
frames = int(seconds * sample_rate)
with wave.open(str(path), "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(sample_rate)
for i in range(frames):
value = int(9000 * math.sin(2 * math.pi * 440 * (i / sample_rate)))
wav.writeframesraw(value.to_bytes(2, byteorder="little", signed=True))
def post_multipart_file(url: str, file_path: Path, timeout: float) -> tuple[int, dict[str, Any]]:
boundary = "----npu-digest-" + uuid.uuid4().hex
file_bytes = file_path.read_bytes()
parts = [
f"--{boundary}\r\nContent-Disposition: form-data; name=\"model\"\r\n\r\nwhisper\r\n".encode(),
f"--{boundary}\r\nContent-Disposition: form-data; name=\"response_format\"\r\n\r\njson\r\n".encode(),
f"--{boundary}\r\nContent-Disposition: form-data; name=\"file\"; filename=\"npu-digest.wav\"\r\nContent-Type: audio/wav\r\n\r\n".encode(),
file_bytes,
f"\r\n--{boundary}--\r\n".encode(),
]
req = urllib.request.Request(url, data=b"".join(parts), headers={"Content-Type": f"multipart/form-data; boundary={boundary}"})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return int(resp.status), json.loads(resp.read(1024 * 1024).decode("utf-8", "replace") or "{}")
except Exception as exc:
return 0, {"error": safe_error(exc)}
def probe_whisper(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH) -> ServiceRow:
row, _ = health_row("whisper", WHISPER_HEALTH_URL, timeout)
row.jobs = 0
if not row.reachable or not include_smoke:
if row.reachable:
row.mode = "health_only"
row.reason = "smoke_disabled"
mark_skipped_fallback(row, "skipped")
return row
with tempfile.TemporaryDirectory(prefix="npu-digest-whisper-") as tmp:
wav_path = Path(tmp) / "probe.wav"
write_tone_wav(wav_path)
status, data, elapsed, delta = measure_probe(lambda: post_multipart_file(WHISPER_URL, wav_path, timeout), "whisper", busy_path)
row.probe_ran = True
row.calls = 1
row.jobs = 1
row.avg_ms = elapsed
row.mode = "NPU"
row.reachable = status == 200 and "error" not in data
row.text_len = len(str(data.get("text") or ""))
row.sample_rate = data.get("sample_rate") if isinstance(data.get("sample_rate"), int) else None
row.response_delta_us = data.get("npu_busy_delta_us") if isinstance(data.get("npu_busy_delta_us"), int) else None
apply_proof(row, delta)
if not row.reachable:
row.warnings.append("probe_http_failed")
row.error = str(data.get("error") or f"http_{status}")[:80]
return row
def probe_genai(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow:
row, health = health_row("genai", GENAI_HEALTH_URL, timeout)
row.loaded = bool(health.get("loaded")) if isinstance(health, dict) and "loaded" in health else None
row.jobs = 0
if not row.reachable:
return row
if not include_smoke or row.loaded is False:
row.mode = "loaded=false" if row.loaded is False else "health_only"
row.reason = "skipped_cold_load" if row.loaded is False else "smoke_disabled"
mark_skipped_fallback(row, row.reason)
return row
payload = {"prompt": "Say pong.", "max_new_tokens": 8}
status, data, elapsed, delta = measure_probe(lambda: post_json(GENAI_GENERATE_URL, payload, timeout), "genai", busy_path)
row.probe_ran = True
row.calls = 1
row.jobs = 1
row.avg_ms = elapsed
row.mode = "NPU"
row.reachable = status == 200 and "error" not in data
apply_proof(row, delta)
return row
def doc_triage_sample_path() -> Path | None:
candidates = [
Path("/home/will/lab/swarm/openvino-doc-image-triage-npu/samples/synthetic_invoice.png"),
Path(__file__).resolve().parents[1] / "openvino-doc-image-triage-npu" / "samples" / "synthetic_invoice.png",
]
for candidate in candidates:
if candidate.exists() and candidate.with_suffix(".png.txt").exists():
return candidate
return None
def probe_doc_triage(timeout: float, include_smoke: bool, busy_path: Path = BUSY_PATH, post_json: Callable[..., tuple[int, dict[str, Any]]] = http_post_json) -> ServiceRow:
row, _ = health_row("doc_triage", DOC_TRIAGE_HEALTH_URL, timeout, gate="closed:private-root")
row.files = 0
if not row.reachable or not include_smoke:
if row.reachable:
row.mode = "health_only"
row.reason = "smoke_disabled"
mark_skipped_fallback(row, "skipped")
return row
sample = doc_triage_sample_path()
if sample is not None:
root = sample.parent.resolve()
payload = {"path": str(sample), "options": {"allowed_roots": [str(root)], "include_ocr_text": False, "use_embeddings": True}}
status, data, elapsed, delta = measure_probe(lambda: post_json(DOC_TRIAGE_URL, payload, timeout), "doc_triage", busy_path)
else:
with tempfile.TemporaryDirectory(prefix="npu-digest-doc-") as tmp:
root = Path(tmp).resolve()
sample = root / "synthetic-invoice.png"
sample.write_bytes(base64.b64decode("iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mP8/x8AAwMCAO+/p9sAAAAASUVORK5CYII="))
sample.with_suffix(".png.txt").write_text("Synthetic invoice. Amount due $12.34 by 2026-06-30. No private data.\n")
payload = {"path": str(sample), "options": {"allowed_roots": [str(root)], "include_ocr_text": False, "use_embeddings": True}}
status, data, elapsed, delta = measure_probe(lambda: post_json(DOC_TRIAGE_URL, payload, timeout), "doc_triage", busy_path)
row.probe_ran = True
row.calls = 1
row.files = 1
row.avg_ms = elapsed
row.mode = "NPU-via-embedding-service"
row.allowed_roots_count = 1
row.reachable = status == 200 and data.get("ok", True) is not False
raw_result = data.get("result")
result: dict[str, Any] = raw_result if isinstance(raw_result, dict) else {}
raw_pages = result.get("pages")
pages: list[Any] = raw_pages if isinstance(raw_pages, list) else []
embedding: dict[str, Any] = {}
if pages and isinstance(pages[0], dict):
raw_attn = pages[0].get("needs_attention")
attn: dict[str, Any] = raw_attn if isinstance(raw_attn, dict) else {}
raw_embedding = attn.get("embedding")
embedding = raw_embedding if isinstance(raw_embedding, dict) else {}
row.response_delta_us = embedding.get("npu_busy_delta_us") if isinstance(embedding.get("npu_busy_delta_us"), int) else None
apply_proof(row, delta)
if not row.reachable:
row.warnings.append("probe_http_failed")
row.error = str(data.get("error") or f"http_{status}")[:80]
return row
def build_summary(rows: list[ServiceRow], artifact_path: str | None, counter_delta_us: int | None, started_at: str) -> dict[str, Any]:
services_ok = sum(1 for r in rows if r.reachable)
proof_rows = [r for r in rows if r.probe_ran and r.proof_ok is not None]
proof_ok = sum(1 for r in proof_rows if r.proof_ok)
gates_closed = sum(1 for r in rows if str(r.gate).startswith("closed:"))
fallbacks = sum(r.fallbacks for r in rows)
warnings: dict[str, int] = {}
for row in rows:
for warning in row.warnings:
warnings[warning] = warnings.get(warning, 0) + 1
return {
"type": "summary",
"timestamp": started_at,
"counter": str(BUSY_PATH),
"delta_us": counter_delta_us,
"services_ok": services_ok,
"services_total": len(rows),
"proof_ok": proof_ok,
"proof_total": len(proof_rows),
"fallbacks": fallbacks,
"gates_closed": gates_closed,
"warnings": warnings,
"artifact": artifact_path,
}
def render_text(summary: dict[str, Any], rows: list[ServiceRow]) -> str:
lines = [
f"NPU utilization digest {summary['timestamp']}",
f"counter={summary['counter']} delta_us={summary.get('delta_us')}",
f"services_ok={summary['services_ok']}/{summary['services_total']} proof_ok={summary['proof_ok']}/{summary['proof_total']} fallbacks={summary['fallbacks']} gates_closed={summary['gates_closed']}",
]
for r in rows:
parts = [f"- {r.service}:", f"ok={str(r.reachable).lower()}"]
if r.calls:
parts.append(f"calls={r.calls}")
if r.jobs is not None:
parts.append(f"jobs={r.jobs}")
if r.events is not None:
parts.append(f"events={r.events}")
if r.files is not None:
parts.append(f"files={r.files}")
if r.docs is not None:
parts.append(f"docs={r.docs}")
if r.avg_ms is not None:
parts.append(f"avg_ms={r.avg_ms}")
if r.npu_delta_us is not None:
parts.append(f"npu_delta_us={r.npu_delta_us}")
if r.proof_ok is not None:
parts.append(f"proof={str(r.proof_ok).lower()}")
if r.dry_run is not None:
parts.append(f"dry_run={str(r.dry_run).lower()}")
if r.suppress is not None:
parts.append(f"suppress={r.suppress}")
if r.escalate is not None:
parts.append(f"escalate={r.escalate}")
if r.loaded is not None:
parts.append(f"loaded={str(r.loaded).lower()}")
if r.allowed_roots_count is not None:
parts.append(f"allowed_roots={r.allowed_roots_count}")
if r.text_len is not None:
parts.append(f"text_len={r.text_len}")
if r.mode:
parts.append(f"mode={r.mode}")
if r.gate != "none":
parts.append(f"gate={r.gate}")
if r.reason:
parts.append(f"reason={r.reason}")
if r.warnings:
parts.append("warnings=" + ",".join(sorted(set(r.warnings))))
lines.append(" ".join(parts))
warning_counts = summary.get("warnings") or {}
lines.append("fallbacks: " + " ".join(f"{k}={v}" for k, v in sorted(warning_counts.items())) if warning_counts else "fallbacks: none")
if summary.get("artifact"):
lines.append(f"artifact: {summary['artifact']}")
return "\n".join(lines)
def write_jsonl(summary: dict[str, Any], rows: list[ServiceRow], out_dir: Path) -> Path:
out_dir.mkdir(parents=True, exist_ok=True)
stamp = summary["timestamp"].replace(":", "").replace("+", "").replace("-", "")
path = out_dir / f"{stamp}.jsonl"
with path.open("w", encoding="utf-8") as f:
f.write(json.dumps(summary, sort_keys=True, separators=(",", ":")) + "\n")
for row in rows:
f.write(json.dumps(compact_dict(row), sort_keys=True, separators=(",", ":")) + "\n")
return path
def str_bool(value: str) -> bool:
lowered = value.lower()
if lowered in {"1", "true", "yes", "y", "on"}:
return True
if lowered in {"0", "false", "no", "n", "off"}:
return False
raise argparse.ArgumentTypeError("expected true or false")
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Compact NPU utilization digest")
parser.add_argument("--format", choices=("text", "jsonl"), default="text")
parser.add_argument("--out", default=str(DEFAULT_OUT_DIR))
parser.add_argument("--timeout-s", type=float, default=8.0)
parser.add_argument("--include-whisper-smoke", type=str_bool, default=True)
parser.add_argument("--include-genai-smoke", type=str_bool, default=False)
parser.add_argument("--include-doc-triage-smoke", type=str_bool, default=True)
parser.add_argument("--no-write", action="store_true")
parser.add_argument("--strict-proof", action="store_true", help="exit nonzero if a proof-required probe ran without positive sysfs delta")
parser.add_argument("--verbose", action="store_true")
return parser.parse_args(argv)
def run(args: argparse.Namespace) -> tuple[dict[str, Any], list[ServiceRow]]:
started_at = dt.datetime.now().astimezone().replace(microsecond=0).isoformat()
before_all = read_busy(BUSY_PATH)
rows = [
probe_embeddings(args.timeout_s),
probe_rerank(args.timeout_s),
probe_whisper(args.timeout_s, args.include_whisper_smoke),
probe_classifier(args.timeout_s),
probe_genai(args.timeout_s, args.include_genai_smoke),
probe_doc_triage(args.timeout_s, args.include_doc_triage_smoke),
]
rows.append(health_row("rag_endpoint", RAG_ENDPOINT_HEALTH_URL, args.timeout_s, gate="closed:vector-mutation")[0])
rows.append(health_row("rag_health", RAG_HEALTH_URL, args.timeout_s)[0])
rows.append(health_row("advisory_gateway", ADVISORY_HEALTH_URL, args.timeout_s, gate="closed:advisory-post")[0])
after_all = read_busy(BUSY_PATH)
delta_all = None if before_all is None or after_all is None else after_all - before_all
summary = build_summary(rows, artifact_path=None, counter_delta_us=delta_all, started_at=started_at)
return summary, rows
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv)
summary, rows = run(args)
if not args.no_write:
artifact = write_jsonl(summary, rows, Path(args.out).expanduser())
summary["artifact"] = str(artifact)
# rewrite with artifact path included in the summary line
artifact.write_text("\n".join([json.dumps(summary, sort_keys=True, separators=(",", ":"))] + [json.dumps(compact_dict(r), sort_keys=True, separators=(",", ":")) for r in rows]) + "\n")
if args.format == "jsonl":
print(json.dumps(summary, sort_keys=True, separators=(",", ":")))
for row in rows:
print(json.dumps(compact_dict(row), sort_keys=True, separators=(",", ":")))
else:
print(render_text(summary, rows))
if args.strict_proof and any(r.probe_ran and r.proof_ok is False for r in rows):
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main())