#!/usr/bin/env python3 """Smoke/benchmark checks for the OpenVINO reranker service. Prints a JSON summary and exits non-zero on schema/ranking/NPU verification failure. Uses only non-private fixture text. """ from __future__ import annotations import argparse import json import statistics import sys import time import urllib.error import urllib.request from pathlib import Path from typing import Any NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") FIXTURES = [ { "query": "how do I verify OpenVINO NPU usage?", "documents": [ {"id": "good", "text": "Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference."}, {"id": "bad", "text": "This note is about making sourdough starter."}, ], "expected_top_id": "good", }, { "query": "what port does the reranker service use?", "documents": [ {"id": "unrelated", "text": "Whisper transcription accepts audio uploads."}, {"id": "port", "text": "The OpenVINO reranker prototype listens locally on port 18818."}, ], "expected_top_id": "port", }, { "query": "why should reranking not mutate vector collections?", "documents": [ {"id": "mutation", "text": "Reranking is a read-only second-stage transformation after vector search."}, {"id": "cooking", "text": "Boil pasta in salted water until al dente."}, ], "expected_top_id": "mutation", }, ] def npu_busy_time_us() -> int | None: try: return int(NPU_BUSY_FILE.read_text().strip()) except Exception: return None def post_json(url: str, payload: dict[str, Any], timeout: float) -> tuple[int, dict[str, Any]]: data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST") try: with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read().decode("utf-8", "replace") return resp.status, json.loads(body) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", "replace") try: parsed = json.loads(body) except Exception: parsed = {"error": body} return exc.code, parsed def get_json(url: str, timeout: float) -> tuple[int, dict[str, Any]]: try: with urllib.request.urlopen(url, timeout=timeout) as resp: body = resp.read().decode("utf-8", "replace") return resp.status, json.loads(body) except urllib.error.HTTPError as exc: body = exc.read().decode("utf-8", "replace") try: parsed = json.loads(body) except Exception: parsed = {"error": body} return exc.code, parsed def percentile(values: list[float], pct: float) -> float | None: if not values: return None ordered = sorted(values) idx = min(len(ordered) - 1, max(0, round((pct / 100.0) * (len(ordered) - 1)))) return round(ordered[idx], 3) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--url", default="http://127.0.0.1:18818") parser.add_argument("--timeout", type=float, default=20.0) parser.add_argument("--allow-cpu", action="store_true", help="do not fail when health reports a non-NPU device") args = parser.parse_args() base = args.url.rstrip("/") failures: list[str] = [] health_status, health = get_json(f"{base}/readyz", args.timeout) if health_status != 200 or not health.get("ok"): failures.append(f"readyz failed status={health_status} error={health.get('ready_error') or health.get('error')}") device = health.get("device") if device != "NPU" and not args.allow_cpu: failures.append(f"device is {device!r}, expected 'NPU'") latencies: list[float] = [] response_npu_total = 0 sysfs_npu_total = 0 top1_passed = 0 for case in FIXTURES: before = npu_busy_time_us() started = time.perf_counter() status, payload = post_json( f"{base}/rerank", {"query": case["query"], "documents": case["documents"], "top_k": len(case["documents"]), "return_documents": False}, args.timeout, ) wall_ms = (time.perf_counter() - started) * 1000 after = npu_busy_time_us() latencies.append(float(payload.get("duration_ms") or wall_ms)) response_delta = payload.get("npu_busy_delta_us") sysfs_delta = None if before is None or after is None else after - before if isinstance(response_delta, int): response_npu_total += response_delta if isinstance(sysfs_delta, int): sysfs_npu_total += sysfs_delta results = payload.get("results") if isinstance(payload, dict) else None top_id = results[0].get("id") if isinstance(results, list) and results else None if status != 200 or not payload.get("ok"): failures.append(f"case {case['expected_top_id']} HTTP/status failed: status={status} error={payload.get('error')}") if not isinstance(results, list) or len(results) != len(case["documents"]): failures.append(f"case {case['expected_top_id']} returned invalid results") if top_id == case["expected_top_id"]: top1_passed += 1 else: failures.append(f"case {case['expected_top_id']} top_id={top_id!r}") if device == "NPU": if not isinstance(response_delta, int) or response_delta <= 0: failures.append(f"case {case['expected_top_id']} response npu delta not positive: {response_delta}") if not isinstance(sysfs_delta, int) or sysfs_delta <= 0: failures.append(f"case {case['expected_top_id']} sysfs npu delta not positive: {sysfs_delta}") summary = { "ok": not failures, "url": base, "model": health.get("model"), "device": device, "cases": len(FIXTURES), "top1_passed": top1_passed, "p50_ms": percentile(latencies, 50), "p95_ms": percentile(latencies, 95), "mean_ms": round(statistics.mean(latencies), 3) if latencies else None, "npu_busy_delta_us_total": sysfs_npu_total, "response_npu_busy_delta_us_total": response_npu_total, "failures": failures, } print(json.dumps(summary, indent=2, sort_keys=True)) return 0 if not failures else 1 if __name__ == "__main__": raise SystemExit(main())