From 06832531577b5a1f7c9b31fb1f08c7eeee991013 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 13:07:51 -0700 Subject: [PATCH] feat(npu): add OpenVINO reranker prototype --- openvino-reranker-npu/README.md | 150 +++++++ openvino-reranker-npu/SPEC.md | 243 +++++++++++ .../openvino-reranker.service | 19 + openvino-reranker-npu/server.py | 393 ++++++++++++++++++ openvino-reranker-npu/smoke.py | 167 ++++++++ .../tests/test_server_validation.py | 55 +++ 6 files changed, 1027 insertions(+) create mode 100644 openvino-reranker-npu/README.md create mode 100644 openvino-reranker-npu/SPEC.md create mode 100644 openvino-reranker-npu/openvino-reranker.service create mode 100755 openvino-reranker-npu/server.py create mode 100755 openvino-reranker-npu/smoke.py create mode 100644 openvino-reranker-npu/tests/test_server_validation.py diff --git a/openvino-reranker-npu/README.md b/openvino-reranker-npu/README.md new file mode 100644 index 0000000..b0b7b70 --- /dev/null +++ b/openvino-reranker-npu/README.md @@ -0,0 +1,150 @@ +# OpenVINO NPU reranker service + +Local-first cross-encoder reranker prototype for second-stage RAG ranking. + +- Default bind: `127.0.0.1:18818` +- Default model: `cross-encoder/ms-marco-MiniLM-L6-v2` +- Default device: `NPU` +- Model cache: `/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov/` +- NPU proof: `/sys/class/accel/accel0/device/npu_busy_time_us` delta before/after inference + +This service is intentionally not wired into live RAG by default. + +## Files + +- `SPEC.md` — endpoint/CLI contract, model/runtime recommendation, smoke/NPU proof plan, RAG integration plan, docs implications, and no-go criteria. +- `server.py` — stdlib HTTP OpenVINO Runtime service with fail-fast localhost listener conflict checks and request validation. +- `smoke.py` — non-private API/ranking/NPU busy-time smoke test. +- `tests/test_server_validation.py` — stdlib unit checks for request validation and listener conflict detection. +- `openvino-reranker.service` — optional user-systemd unit. + +## One-time setup + +Use a separate venv so the existing Whisper/embeddings NPU venv is not perturbed: + +```bash +python -m venv /home/will/.venvs/openvino-reranker +source /home/will/.venvs/openvino-reranker/bin/activate +python -m pip install -U pip +python -m pip install "openvino>=2026.2" "optimum-intel[openvino]" transformers tokenizers nncf numpy +``` + +Export the model: + +```bash +source /home/will/.venvs/openvino-reranker/bin/activate +optimum-cli export openvino \ + --model cross-encoder/ms-marco-MiniLM-L6-v2 \ + --task text-classification \ + --weight-format int8 \ + --trust-remote-code false \ + /home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov +``` + +If INT8 export or NPU compile fails, export an FP16/FP32 IR to a separate directory and point `OPENVINO_RERANKER_MODEL_DIR` at it while debugging. Do not overwrite existing vector/RAG/Chroma collections. + +## Run in foreground + +Check the port and NPU counter first: + +```bash +ss -ltnp | grep ':18818 ' || true +cat /sys/class/accel/accel0/device/npu_busy_time_us +``` + +Start locally: + +```bash +source /home/will/.venvs/openvino-reranker/bin/activate +OPENVINO_RERANKER_HOST=127.0.0.1 \ +OPENVINO_RERANKER_PORT=18818 \ +OPENVINO_RERANKER_DEVICE=NPU \ +OPENVINO_RERANKER_MODEL_DIR=/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov \ +python /home/will/lab/swarm/openvino-reranker-npu/server.py +``` + +Startup performs a non-private smoke inference and fails closed when `OPENVINO_RERANKER_DEVICE=NPU` but `npu_busy_time_us` does not increase. It also checks whether the requested listener can bind before compiling the OpenVINO model, so obvious port conflicts fail fast; the real server bind still happens immediately after model load. + +## API + +Health: + +```bash +curl -sS http://127.0.0.1:18818/healthz | jq +curl -sS http://127.0.0.1:18818/readyz | jq +``` + +Rerank: + +```bash +curl -sS http://127.0.0.1:18818/rerank \ + -H 'Content-Type: application/json' \ + -d '{ + "query":"how do I verify OpenVINO NPU usage?", + "documents":[ + {"id":"good","text":"Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference."}, + {"id":"bad","text":"This note is about making sourdough starter."} + ], + "top_k":2 + }' | jq +``` + +Compatibility alias: + +```bash +curl -sS http://127.0.0.1:18818/v1/rerank \ + -H 'Content-Type: application/json' \ + -d '{"model":"local-reranker","query":"npu busy time","documents":["OpenVINO NPU busy time proves accelerator use."],"top_n":1}' | jq +``` + +## Smoke test + +```bash +source /home/will/.venvs/openvino-reranker/bin/activate +python /home/will/lab/swarm/openvino-reranker-npu/smoke.py --url http://127.0.0.1:18818 +``` + +Expected: + +- `/readyz` is HTTP 200 and reports `device=NPU`. +- Each fixture returns `ok=true` and a sorted `results` list. +- The top result matches the non-private fixture expectation. +- Response and sysfs `npu_busy_delta_us` are positive. + +## Validation checks + +```bash +source /home/will/.venvs/openvino-reranker/bin/activate +PYTHONPATH=/home/will/lab/swarm/openvino-reranker-npu \ + python -m unittest discover -s /home/will/lab/swarm/openvino-reranker-npu/tests +``` + +These checks do not compile the OpenVINO model; they cover request validation and fail-fast listener conflict detection. + +## Optional systemd user service + +Install the unit only after the foreground command and smoke test pass: + +```bash +cp /home/will/lab/swarm/openvino-reranker-npu/openvino-reranker.service /home/will/.config/systemd/user/openvino-reranker.service +systemctl --user daemon-reload +systemctl --user start openvino-reranker.service +systemctl --user status openvino-reranker.service --no-pager +journalctl --user -u openvino-reranker.service -n 100 --no-pager +``` + +Do not enable or integrate it into live RAG without explicit approval. + +## Optional RAG integration plan (disabled by default) + +RAG should keep vector search against `obsidian_bge_npu` unchanged, retrieve a larger candidate set, and call this service as a read-only request-time second stage. Suggested disabled-by-default knobs: + +```text +RAG_RERANK_ENABLED=false +RAG_RERANK_URL=http://127.0.0.1:18818/rerank +RAG_RERANK_INITIAL_K=20 +RAG_RERANK_TOP_K=5 +RAG_RERANK_TIMEOUT_MS=3000 +``` + +On reranker timeout/error, fall back to vector order and include metadata such as `rerank_error`; do not mutate or reindex Chroma collections. diff --git a/openvino-reranker-npu/SPEC.md b/openvino-reranker-npu/SPEC.md new file mode 100644 index 0000000..cd32c13 --- /dev/null +++ b/openvino-reranker-npu/SPEC.md @@ -0,0 +1,243 @@ +# OpenVINO NPU reranker service spec + +Status: proposed localhost prototype; not live RAG integration. +Target port: `127.0.0.1:18818`. +Safety posture: foreground smoke first, no persistent enablement, no Atlas/Hermes/RAG routing changes without Will's explicit approval. + +## Recommendation + +Use `cross-encoder/ms-marco-MiniLM-L6-v2`, exported to OpenVINO IR as INT8, served by the local stdlib HTTP service in `server.py` on OpenVINO Runtime `NPU`. + +Why this choice: + +- It is a small BERT-family cross-encoder reranker intended for MS MARCO-style passage ranking, matching the second-stage RAG use case better than another embedding-only similarity pass. +- The model shape is simple pairwise text classification/scoring: `(query, document) -> score`, which maps cleanly to OpenVINO Runtime and avoids introducing a heavier LLM worker for reranking. +- INT8 OpenVINO IR keeps memory and compile/runtime cost low enough for a localhost sidecar and is already represented in the repo defaults: + `/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov`. +- The service can fail closed on startup when `OPENVINO_RERANKER_DEVICE=NPU` but `/sys/class/accel/accel0/device/npu_busy_time_us` does not increase, preventing false "NPU-backed" claims. + +Runtime default: + +```text +OPENVINO_RERANKER_HOST=127.0.0.1 +OPENVINO_RERANKER_PORT=18818 +OPENVINO_RERANKER_DEVICE=NPU +OPENVINO_RERANKER_MODEL=cross-encoder/ms-marco-MiniLM-L6-v2 +OPENVINO_RERANKER_MODEL_DIR=/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov +OPENVINO_RERANKER_MAX_LENGTH=512 +OPENVINO_RERANKER_MAX_DOCUMENTS=100 +OPENVINO_RERANKER_MAX_BODY_BYTES=5242880 +``` + +## Endpoint contract + +### Health and readiness + +`GET /healthz` and `GET /readyz` return JSON. + +`/readyz` must return HTTP 200 only when the model is loaded and startup smoke passed. For NPU mode, startup smoke must include a positive `npu_busy_delta_us`. + +Representative ready response: + +```json +{ + "status": "ok", + "ok": true, + "service": "openvino-reranker", + "model": "cross-encoder/ms-marco-MiniLM-L6-v2", + "model_dir": "/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov", + "device": "NPU", + "available_devices": ["CPU", "NPU"], + "max_length": 512, + "startup_smoke": {"ok": true, "duration_ms": 12.3, "npu_busy_delta_us": 1234}, + "last_inference": null, + "ready_error": null +} +``` + +### Rerank + +`POST /rerank` and compatibility alias `POST /v1/rerank` accept: + +```json +{ + "query": "how do I verify OpenVINO NPU usage?", + "documents": [ + {"id": "good", "text": "Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference.", "metadata": {"source": "synthetic"}}, + {"id": "bad", "text": "This note is about making sourdough starter."} + ], + "top_k": 2, + "return_documents": false +} +``` + +Compatibility notes: + +- `documents` may be strings or objects with `id`, `text`, and optional object `metadata`. +- `top_k` is preferred; `top_n` is accepted for common reranker-client compatibility. +- `return_documents=false` is recommended for RAG integration to avoid echoing private source text into logs or intermediate traces. +- The optional `model` field may be sent by clients but is not used for routing; this sidecar serves one configured model. + +Successful response: + +```json +{ + "ok": true, + "model": "cross-encoder/ms-marco-MiniLM-L6-v2", + "device": "NPU", + "query": "how do I verify OpenVINO NPU usage?", + "input_count": 2, + "top_k": 2, + "duration_ms": 10.5, + "npu_busy_delta_us": 1234, + "results": [ + {"index": 0, "id": "good", "score": 8.1, "raw_score": 8.1, "probability": 0.9997}, + {"index": 1, "id": "bad", "score": -4.2, "raw_score": -4.2, "probability": 0.0148} + ] +} +``` + +Error response shape: + +```json +{"ok": false, "error": "human-readable error", "results": []} +``` + +Status behavior: + +- 400: invalid JSON schema, empty query, missing/empty documents, invalid document text, or non-positive/non-integer `top_k`/`top_n`. +- 413: request body above `OPENVINO_RERANKER_MAX_BODY_BYTES`. +- 503: model not ready. +- 500: unexpected inference/runtime failure. + +## CLI contract + +Foreground-only review start: + +```bash +ss -ltnp | grep ':18818\b' || true +cat /sys/class/accel/accel0/device/npu_busy_time_us +source /home/will/.venvs/openvino-reranker/bin/activate +OPENVINO_RERANKER_HOST=127.0.0.1 \ +OPENVINO_RERANKER_PORT=18818 \ +OPENVINO_RERANKER_DEVICE=NPU \ +OPENVINO_RERANKER_MODEL_DIR=/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov \ +python /home/will/lab/swarm/openvino-reranker-npu/server.py +``` + +Client smoke: + +```bash +source /home/will/.venvs/openvino-reranker/bin/activate +python /home/will/lab/swarm/openvino-reranker-npu/smoke.py --url http://127.0.0.1:18818 +``` + +Optional user-systemd unit exists as `openvino-reranker.service`, but this spec does not approve copying, starting, enabling, or wiring it into live paths. + +## Non-private smoke payload + +Use only synthetic public-text fixtures. Do not query the Obsidian vault, private document directories, image folders, or live Chroma documents during smoke. + +Minimum cases: + +1. Query: `how do I verify OpenVINO NPU usage?` + - Expected top document: `Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference.` + - Distractor: `This note is about making sourdough starter.` +2. Query: `what port does the reranker service use?` + - Expected top document: `The OpenVINO reranker prototype listens locally on port 18818.` + - Distractor: `Whisper transcription accepts audio uploads.` +3. Query: `why should reranking not mutate vector collections?` + - Expected top document: `Reranking is a read-only second-stage transformation after vector search.` + - Distractor: `Boil pasta in salted water until al dente.` + +Pass criteria: + +- `/readyz` is HTTP 200 and reports `device=NPU`. +- Every case returns `ok=true` and a sorted `results` list with the expected top `id`. +- Response-level `npu_busy_delta_us` is positive for each case. +- External sysfs `after - before` is positive for each case or at least for the full smoke batch. +- Smoke script exits 0 and prints JSON with `ok: true`. + +## NPU busy-time verification plan + +HTTP 200 is not proof. Verification must capture both endpoint-reported and sysfs-observed deltas. + +Procedure: + +```bash +BUSY=/sys/class/accel/accel0/device/npu_busy_time_us +before=$(cat "$BUSY") +curl -fsS http://127.0.0.1:18818/rerank \ + -H 'Content-Type: application/json' \ + -d '{"query":"how do I verify OpenVINO NPU usage?","documents":[{"id":"good","text":"Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference."},{"id":"bad","text":"This note is about making sourdough starter."}],"top_k":2,"return_documents":false}' \ + | jq '{ok, device, npu_busy_delta_us, top_id:.results[0].id}' +after=$(cat "$BUSY") +echo "sysfs_npu_busy_delta_us=$((after-before))" +``` + +Acceptance: + +- `device == "NPU"`. +- Response `npu_busy_delta_us > 0`. +- Shell-computed `sysfs_npu_busy_delta_us > 0`. +- If any value is zero/negative/missing, call the result CPU/unknown and do not claim NPU-backed reranking. + +## Optional RAG second-stage integration plan (deferred) + +This is a plan only. Do not enable it in live RAG without explicit approval. + +Design: + +1. Keep existing vector search and Chroma collection `obsidian_bge_npu` unchanged. +2. Retrieve more candidates from current vector search, e.g. `initial_k=20`. +3. Send only request-time candidate snippets/ids to `http://127.0.0.1:18818/rerank`. +4. Use reranker order to choose final `top_k`, e.g. `5`. +5. On timeout, connection error, invalid response, or non-positive NPU proof when proof is required, fall back to vector order and attach metadata like `rerank_error`; do not fail the whole RAG request unless explicitly configured. +6. Log counters and latency, but avoid logging raw private document text. + +Disabled-by-default knobs: + +```text +RAG_RERANK_ENABLED=false +RAG_RERANK_URL=http://127.0.0.1:18818/rerank +RAG_RERANK_INITIAL_K=20 +RAG_RERANK_TOP_K=5 +RAG_RERANK_TIMEOUT_MS=3000 +RAG_RERANK_REQUIRE_NPU_PROOF=true +RAG_RERANK_RETURN_DOCUMENTS=false +``` + +Integration tests should use synthetic in-memory candidates first. Live-vault evaluation requires a separate approval and must not mutate or rebuild the vector collection. + +## Docs and diagram implications + +If this prototype advances beyond spec/review, update these surfaces while keeping live/prototype labels clear: + +- `openvino-reranker-npu/README.md`: keep model/runtime, endpoint contract, smoke command, and approval gates synchronized with code. +- `swarm-common/obsidian-vault/will/will-shared-zap/Runbooks/OpenVINO NPU Services Runbook.md`: list `:18818` as prototype/not enabled, with foreground smoke and NPU sysfs proof. +- Service catalog / architecture notes: show live baseline `:18810`, `:18816`, `:18817`; show `:18818` as optional second-stage RAG prototype, not live routing. +- Diagrams: render `RAG :18810 -> optional reranker :18818` as dashed/disabled or "proposed"; do not imply Atlas/Hermes/gateway traffic is using it. +- Optional systemd unit: document as installable after approval, not enabled by default. + +## No-go / defer criteria + +Do not ship, enable, or integrate the reranker if any of these hold: + +- Port `18818` is already owned by another live service. +- `NPU` is unavailable in `ov.Core().available_devices` or `/sys/class/accel/accel0/device/npu_busy_time_us` is missing. +- Foreground startup smoke fails or has non-positive NPU busy-time delta while configured for NPU. +- Synthetic smoke top-1 ranking fails or latency is unacceptable for the intended RAG timeout budget. +- Model export requires overwriting the existing model directory or touching Chroma/vector collections. +- The service must bind beyond `127.0.0.1` to be useful. +- Live RAG integration would require reindexing, collection mutation, private-doc smoke, or Atlas/Hermes/gateway routing changes without explicit approval. +- Logs or responses would persist raw private document text outside the existing RAG request path. + +## Current local preflight observed during this spec pass + +- `/sys/class/accel/accel0/device/npu_busy_time_us` is readable. +- `/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov` is present. +- `/home/will/.venvs/openvino-reranker/bin/python` is present. +- `:18818` was not listening during preflight. +- `server.py` and `smoke.py` pass `python -m py_compile`. + +These observations are preflight only; they are not a live service/NPU smoke result. diff --git a/openvino-reranker-npu/openvino-reranker.service b/openvino-reranker-npu/openvino-reranker.service new file mode 100644 index 0000000..f979b9a --- /dev/null +++ b/openvino-reranker-npu/openvino-reranker.service @@ -0,0 +1,19 @@ +[Unit] +Description=OpenVINO NPU Reranker HTTP Service (port 18818) +After=network-online.target + +[Service] +Type=simple +WorkingDirectory=/home/will/lab/swarm/openvino-reranker-npu +Environment=OPENVINO_RERANKER_HOST=127.0.0.1 +Environment=OPENVINO_RERANKER_PORT=18818 +Environment=OPENVINO_RERANKER_MODEL=cross-encoder/ms-marco-MiniLM-L6-v2 +Environment=OPENVINO_RERANKER_MODEL_DIR=/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov +Environment=OPENVINO_RERANKER_DEVICE=NPU +Environment=OPENVINO_RERANKER_MAX_LENGTH=512 +ExecStart=/home/will/.venvs/openvino-reranker/bin/python /home/will/lab/swarm/openvino-reranker-npu/server.py +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=default.target diff --git a/openvino-reranker-npu/server.py b/openvino-reranker-npu/server.py new file mode 100755 index 0000000..26cc1bd --- /dev/null +++ b/openvino-reranker-npu/server.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python3 +"""OpenVINO NPU cross-encoder reranker HTTP service. + +Default port: 18818 +Default model: cross-encoder/ms-marco-MiniLM-L6-v2 exported as OpenVINO IR +Default device: NPU + +Endpoints: + GET /, /healthz, /readyz + POST /rerank + POST /v1/rerank +""" +from __future__ import annotations + +import argparse +import json +import math +import os +import socket +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + +import numpy as np +import openvino as ov +from transformers import AutoTokenizer + +DEFAULT_MODEL_ID = "cross-encoder/ms-marco-MiniLM-L6-v2" +DEFAULT_MODEL_DIR = Path("/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov") +DEFAULT_PORT = 18818 +DEFAULT_MAX_LENGTH = 512 +DEFAULT_MAX_DOCUMENTS = 100 +DEFAULT_MAX_BODY_BYTES = 5 * 1024 * 1024 +NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") + + +def npu_busy_time_us() -> int | None: + try: + return int(NPU_BUSY_FILE.read_text().strip()) + except Exception: + return None + + +def sigmoid(x: float) -> float: + if x >= 0: + z = math.exp(-x) + return 1.0 / (1.0 + z) + z = math.exp(x) + return z / (1.0 + z) + + +def softmax_prob(logits: np.ndarray, index: int = 1) -> float: + row = np.asarray(logits, dtype=np.float64).reshape(-1) + shifted = row - np.max(row) + probs = np.exp(shifted) / np.sum(np.exp(shifted)) + return float(probs[index]) + + +class RerankerService: + def __init__( + self, + model_dir: Path, + model_id: str, + device: str, + max_length: int, + startup_smoke: bool = True, + ) -> None: + self.model_dir = model_dir + self.model_id = model_id + self.device = device + self.max_length = int(max_length) + self.loaded_at = time.time() + self.lock = threading.Lock() + self.last_inference: dict[str, Any] | None = None + self.startup_smoke: dict[str, Any] | None = None + self.ready = False + self.ready_error: str | None = None + + if not self.model_dir.exists(): + raise FileNotFoundError(f"model directory not found: {self.model_dir}") + + self.core = ov.Core() + self.available_devices = list(self.core.available_devices) + if self.device not in self.available_devices: + raise RuntimeError(f"OpenVINO device {self.device!r} unavailable; available={self.available_devices}") + + xml_path = self.model_dir / "openvino_model.xml" + if not xml_path.exists(): + raise FileNotFoundError(f"OpenVINO IR not found: {xml_path}") + + self.tokenizer = AutoTokenizer.from_pretrained(str(self.model_dir), local_files_only=True) + model = self.core.read_model(str(xml_path)) + self._reshape_static(model) + self.compiled = self.core.compile_model(model, self.device) + self.input_names = {inp.get_any_name() for inp in self.compiled.inputs} + self.output = self.compiled.output(0) + + if startup_smoke: + try: + smoke = self.rerank( + "npu busy time", + [{"id": "smoke", "text": "OpenVINO NPU usage is verified by npu_busy_time_us."}], + top_k=1, + return_documents=False, + ) + self.startup_smoke = { + "ok": bool(smoke.get("ok")), + "duration_ms": smoke.get("duration_ms"), + "npu_busy_delta_us": smoke.get("npu_busy_delta_us"), + } + if self.device == "NPU" and int(smoke.get("npu_busy_delta_us") or 0) <= 0: + raise RuntimeError("startup smoke did not increase npu_busy_time_us") + except Exception as exc: + self.ready_error = f"startup smoke failed: {type(exc).__name__}: {exc}" + raise + + self.ready = True + + def _reshape_static(self, model: ov.Model) -> None: + shape_by_name: dict[str, list[int]] = {} + for inp in model.inputs: + name = inp.get_any_name() + if name in {"input_ids", "attention_mask", "token_type_ids"}: + shape_by_name[name] = [1, self.max_length] + if shape_by_name: + model.reshape(shape_by_name) + + def _tokenize(self, query: str, document: str) -> dict[str, np.ndarray]: + tokens = self.tokenizer( + query, + document, + max_length=self.max_length, + padding="max_length", + truncation=True, + return_tensors="np", + ) + return {name: np.asarray(value) for name, value in tokens.items() if name in self.input_names} + + def _score_pair(self, query: str, document: str) -> dict[str, float | None]: + inputs = self._tokenize(query, document) + missing = self.input_names - set(inputs) + # Some exported BERT models do not use token_type_ids. input_ids and attention_mask are required. + required_missing = missing & {"input_ids", "attention_mask"} + if required_missing: + raise RuntimeError(f"tokenizer did not produce required inputs: {sorted(required_missing)}") + outputs = self.compiled(inputs) + logits = np.asarray(outputs[self.output]) + flat = logits.reshape(-1) + if flat.size == 1: + raw = float(flat[0]) + return {"score": raw, "raw_score": raw, "probability": sigmoid(raw)} + if flat.size >= 2: + raw = float(flat[1]) + return {"score": raw, "raw_score": raw, "probability": softmax_prob(flat, 1)} + raise RuntimeError(f"unexpected empty logits shape: {list(logits.shape)}") + + def rerank( + self, + query: str, + documents: list[dict[str, Any]], + *, + top_k: int | None, + return_documents: bool = True, + ) -> dict[str, Any]: + before = npu_busy_time_us() + started = time.perf_counter() + results: list[dict[str, Any]] = [] + with self.lock: + for idx, doc in enumerate(documents): + scored = self._score_pair(query, str(doc["text"])) + item: dict[str, Any] = { + "index": idx, + "score": scored["score"], + "raw_score": scored["raw_score"], + "probability": scored["probability"], + } + if doc.get("id") is not None: + item["id"] = doc.get("id") + if return_documents: + item["text"] = doc["text"] + item["metadata"] = doc.get("metadata") if isinstance(doc.get("metadata"), dict) else {} + results.append(item) + after = npu_busy_time_us() + results.sort(key=lambda item: (-float(item["score"]), int(item["index"]))) + clamped_top_k = len(results) if top_k is None else max(1, min(int(top_k), len(results))) + duration_ms = round((time.perf_counter() - started) * 1000, 3) + npu_delta = None if before is None or after is None else after - before + payload = { + "ok": True, + "model": self.model_id, + "model_dir": str(self.model_dir), + "device": self.device, + "query": query, + "input_count": len(documents), + "top_k": clamped_top_k, + "duration_ms": duration_ms, + "npu_busy_delta_us": npu_delta, + "results": results[:clamped_top_k], + } + self.last_inference = { + "duration_ms": duration_ms, + "docs": len(documents), + "npu_busy_delta_us": npu_delta, + } + return payload + + def health(self) -> dict[str, Any]: + status = "ok" if self.ready else "degraded" + return { + "status": status, + "ok": self.ready, + "service": "openvino-reranker", + "model": self.model_id, + "model_dir": str(self.model_dir), + "device": self.device, + "available_devices": self.available_devices, + "max_length": self.max_length, + "input_names": sorted(self.input_names), + "uptime_s": round(time.time() - self.loaded_at, 3), + "npu_busy_time_us": npu_busy_time_us(), + "startup_smoke": self.startup_smoke, + "last_inference": self.last_inference, + "ready_error": self.ready_error, + } + + +def normalize_documents(value: Any, max_documents: int) -> list[dict[str, Any]]: + if not isinstance(value, list) or not value: + raise ValueError("documents must be a non-empty list") + if len(value) > max_documents: + raise ValueError(f"documents exceeds max_documents={max_documents}") + docs: list[dict[str, Any]] = [] + for idx, item in enumerate(value): + if isinstance(item, str): + text = item + doc: dict[str, Any] = {"text": text} + elif isinstance(item, dict): + text = item.get("text") + doc = { + "id": item.get("id"), + "text": text, + "metadata": item.get("metadata") if isinstance(item.get("metadata"), dict) else {}, + } + else: + raise ValueError(f"documents[{idx}] must be a string or object") + if not isinstance(text, str) or not text.strip(): + raise ValueError(f"documents[{idx}].text must be a non-empty string") + docs.append(doc) + return docs + + +def parse_top_k(value: Any, document_count: int) -> int: + """Validate top_k/top_n before inference so schema errors return HTTP 400.""" + if value is None: + return document_count + if isinstance(value, bool) or not isinstance(value, int): + raise ValueError("top_k/top_n must be a positive integer") + if value < 1: + raise ValueError("top_k/top_n must be a positive integer") + return min(value, document_count) + + +def assert_port_available(host: str, port: int) -> None: + """Fail fast on listener conflicts before compiling the OpenVINO model.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + try: + sock.bind((host, port)) + except OSError as exc: + raise RuntimeError(f"cannot bind {host}:{port}; listener conflict or invalid bind: {exc}") from exc + + +class Handler(BaseHTTPRequestHandler): + server_version = "OpenVINOReranker/0.1" + + @property + def svc(self) -> RerankerService: + return self.server.reranker_service # type: ignore[attr-defined] + + @property + def max_body_bytes(self) -> int: + return self.server.max_body_bytes # type: ignore[attr-defined] + + @property + def max_documents(self) -> int: + return self.server.max_documents # type: ignore[attr-defined] + + def do_GET(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + if path == "/": + self.write_json({"ok": True, "service": "openvino-reranker", "endpoints": ["/healthz", "/readyz", "/rerank", "/v1/rerank"]}) + elif path in {"/healthz", "/health"}: + self.write_json(self.svc.health(), status=200) + elif path == "/readyz": + health = self.svc.health() + self.write_json(health, status=200 if health.get("ok") else 503) + else: + self.write_json({"ok": False, "error": "not found", "results": []}, status=404) + + def do_POST(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + try: + if path not in {"/rerank", "/v1/rerank"}: + self.write_json({"ok": False, "error": "not found", "results": []}, status=404) + return + if not self.svc.ready: + self.write_json({"ok": False, "error": self.svc.ready_error or "model not ready", "results": []}, status=503) + return + payload = self.read_json() + query = payload.get("query") + if not isinstance(query, str) or not query.strip(): + raise ValueError("query is required") + top_k = payload.get("top_k", payload.get("top_n")) + documents = normalize_documents(payload.get("documents"), self.max_documents) + top_k = parse_top_k(top_k, len(documents)) + return_documents = bool(payload.get("return_documents", True)) + response = self.svc.rerank(query.strip(), documents, top_k=top_k, return_documents=return_documents) + self.write_json(response) + except RequestTooLarge as exc: + self.write_json({"ok": False, "error": str(exc), "results": []}, status=413) + except ValueError as exc: + self.write_json({"ok": False, "error": str(exc), "results": []}, status=400) + except Exception as exc: + self.write_json({"ok": False, "error": f"{type(exc).__name__}: {exc}", "results": []}, status=500) + + def read_json(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length") or 0) + if length > self.max_body_bytes: + raise RequestTooLarge(f"request body exceeds {self.max_body_bytes} bytes") + body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}" + payload = json.loads(body or "{}") + if not isinstance(payload, dict): + raise ValueError("JSON body must be an object") + return payload + + def write_json(self, payload: dict[str, Any], status: int = 200) -> None: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name + print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) + + +class RequestTooLarge(ValueError): + pass + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--host", default=os.environ.get("OPENVINO_RERANKER_HOST", "127.0.0.1")) + parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_RERANKER_PORT", DEFAULT_PORT))) + parser.add_argument("--model-dir", default=os.environ.get("OPENVINO_RERANKER_MODEL_DIR", str(DEFAULT_MODEL_DIR))) + parser.add_argument("--model", default=os.environ.get("OPENVINO_RERANKER_MODEL", DEFAULT_MODEL_ID)) + parser.add_argument("--device", default=os.environ.get("OPENVINO_RERANKER_DEVICE", "NPU")) + parser.add_argument("--max-length", type=int, default=int(os.environ.get("OPENVINO_RERANKER_MAX_LENGTH", str(DEFAULT_MAX_LENGTH)))) + parser.add_argument("--max-documents", type=int, default=int(os.environ.get("OPENVINO_RERANKER_MAX_DOCUMENTS", str(DEFAULT_MAX_DOCUMENTS)))) + parser.add_argument("--max-body-bytes", type=int, default=int(os.environ.get("OPENVINO_RERANKER_MAX_BODY_BYTES", str(DEFAULT_MAX_BODY_BYTES)))) + parser.add_argument("--skip-startup-smoke", action="store_true", default=os.environ.get("OPENVINO_RERANKER_SKIP_STARTUP_SMOKE", "").lower() in {"1", "true", "yes"}) + args = parser.parse_args() + + assert_port_available(args.host, args.port) + service = RerankerService( + Path(args.model_dir).expanduser(), + args.model, + args.device, + args.max_length, + startup_smoke=not args.skip_startup_smoke, + ) + httpd = ThreadingHTTPServer((args.host, args.port), Handler) + httpd.reranker_service = service # type: ignore[attr-defined] + httpd.max_body_bytes = args.max_body_bytes # type: ignore[attr-defined] + httpd.max_documents = args.max_documents # type: ignore[attr-defined] + print( + f"openvino-reranker listening on {args.host}:{args.port} model={args.model} " + f"model_dir={args.model_dir} device={args.device} max_length={args.max_length}", + flush=True, + ) + try: + httpd.serve_forever() + except KeyboardInterrupt: + pass + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/openvino-reranker-npu/smoke.py b/openvino-reranker-npu/smoke.py new file mode 100755 index 0000000..3710160 --- /dev/null +++ b/openvino-reranker-npu/smoke.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +"""Smoke/benchmark checks for the OpenVINO reranker service. + +Prints a JSON summary and exits non-zero on schema/ranking/NPU verification failure. +Uses only non-private fixture text. +""" +from __future__ import annotations + +import argparse +import json +import statistics +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any + +NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") + +FIXTURES = [ + { + "query": "how do I verify OpenVINO NPU usage?", + "documents": [ + {"id": "good", "text": "Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference."}, + {"id": "bad", "text": "This note is about making sourdough starter."}, + ], + "expected_top_id": "good", + }, + { + "query": "what port does the reranker service use?", + "documents": [ + {"id": "unrelated", "text": "Whisper transcription accepts audio uploads."}, + {"id": "port", "text": "The OpenVINO reranker prototype listens locally on port 18818."}, + ], + "expected_top_id": "port", + }, + { + "query": "why should reranking not mutate vector collections?", + "documents": [ + {"id": "mutation", "text": "Reranking is a read-only second-stage transformation after vector search."}, + {"id": "cooking", "text": "Boil pasta in salted water until al dente."}, + ], + "expected_top_id": "mutation", + }, +] + + +def npu_busy_time_us() -> int | None: + try: + return int(NPU_BUSY_FILE.read_text().strip()) + except Exception: + return None + + +def post_json(url: str, payload: dict[str, Any], timeout: float) -> tuple[int, dict[str, Any]]: + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST") + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + body = resp.read().decode("utf-8", "replace") + return resp.status, json.loads(body) + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", "replace") + try: + parsed = json.loads(body) + except Exception: + parsed = {"error": body} + return exc.code, parsed + + +def get_json(url: str, timeout: float) -> tuple[int, dict[str, Any]]: + try: + with urllib.request.urlopen(url, timeout=timeout) as resp: + body = resp.read().decode("utf-8", "replace") + return resp.status, json.loads(body) + except urllib.error.HTTPError as exc: + body = exc.read().decode("utf-8", "replace") + try: + parsed = json.loads(body) + except Exception: + parsed = {"error": body} + return exc.code, parsed + + +def percentile(values: list[float], pct: float) -> float | None: + if not values: + return None + ordered = sorted(values) + idx = min(len(ordered) - 1, max(0, round((pct / 100.0) * (len(ordered) - 1)))) + return round(ordered[idx], 3) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--url", default="http://127.0.0.1:18818") + parser.add_argument("--timeout", type=float, default=20.0) + parser.add_argument("--allow-cpu", action="store_true", help="do not fail when health reports a non-NPU device") + args = parser.parse_args() + + base = args.url.rstrip("/") + failures: list[str] = [] + health_status, health = get_json(f"{base}/readyz", args.timeout) + if health_status != 200 or not health.get("ok"): + failures.append(f"readyz failed status={health_status} error={health.get('ready_error') or health.get('error')}") + device = health.get("device") + if device != "NPU" and not args.allow_cpu: + failures.append(f"device is {device!r}, expected 'NPU'") + + latencies: list[float] = [] + response_npu_total = 0 + sysfs_npu_total = 0 + top1_passed = 0 + + for case in FIXTURES: + before = npu_busy_time_us() + started = time.perf_counter() + status, payload = post_json( + f"{base}/rerank", + {"query": case["query"], "documents": case["documents"], "top_k": len(case["documents"]), "return_documents": False}, + args.timeout, + ) + wall_ms = (time.perf_counter() - started) * 1000 + after = npu_busy_time_us() + latencies.append(float(payload.get("duration_ms") or wall_ms)) + response_delta = payload.get("npu_busy_delta_us") + sysfs_delta = None if before is None or after is None else after - before + if isinstance(response_delta, int): + response_npu_total += response_delta + if isinstance(sysfs_delta, int): + sysfs_npu_total += sysfs_delta + results = payload.get("results") if isinstance(payload, dict) else None + top_id = results[0].get("id") if isinstance(results, list) and results else None + if status != 200 or not payload.get("ok"): + failures.append(f"case {case['expected_top_id']} HTTP/status failed: status={status} error={payload.get('error')}") + if not isinstance(results, list) or len(results) != len(case["documents"]): + failures.append(f"case {case['expected_top_id']} returned invalid results") + if top_id == case["expected_top_id"]: + top1_passed += 1 + else: + failures.append(f"case {case['expected_top_id']} top_id={top_id!r}") + if device == "NPU": + if not isinstance(response_delta, int) or response_delta <= 0: + failures.append(f"case {case['expected_top_id']} response npu delta not positive: {response_delta}") + if not isinstance(sysfs_delta, int) or sysfs_delta <= 0: + failures.append(f"case {case['expected_top_id']} sysfs npu delta not positive: {sysfs_delta}") + + summary = { + "ok": not failures, + "url": base, + "model": health.get("model"), + "device": device, + "cases": len(FIXTURES), + "top1_passed": top1_passed, + "p50_ms": percentile(latencies, 50), + "p95_ms": percentile(latencies, 95), + "mean_ms": round(statistics.mean(latencies), 3) if latencies else None, + "npu_busy_delta_us_total": sysfs_npu_total, + "response_npu_busy_delta_us_total": response_npu_total, + "failures": failures, + } + print(json.dumps(summary, indent=2, sort_keys=True)) + return 0 if not failures else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/openvino-reranker-npu/tests/test_server_validation.py b/openvino-reranker-npu/tests/test_server_validation.py new file mode 100644 index 0000000..0852b55 --- /dev/null +++ b/openvino-reranker-npu/tests/test_server_validation.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +"""Unit checks for reranker request validation helpers. + +These tests intentionally avoid loading an OpenVINO model; they only cover the +stdlib validation helpers used before inference. +""" +from __future__ import annotations + +import socket +import unittest + +from server import assert_port_available, normalize_documents, parse_top_k + + +class ValidationTests(unittest.TestCase): + def test_normalize_accepts_strings_and_objects(self) -> None: + docs = normalize_documents( + [ + "plain text document", + {"id": "obj", "text": "object document", "metadata": {"source": "synthetic"}}, + ], + max_documents=2, + ) + self.assertEqual(docs[0], {"text": "plain text document"}) + self.assertEqual(docs[1]["id"], "obj") + self.assertEqual(docs[1]["metadata"], {"source": "synthetic"}) + + def test_normalize_rejects_empty_or_too_many_documents(self) -> None: + with self.assertRaisesRegex(ValueError, "non-empty"): + normalize_documents([], max_documents=2) + with self.assertRaisesRegex(ValueError, "max_documents"): + normalize_documents(["a", "b", "c"], max_documents=2) + with self.assertRaisesRegex(ValueError, "non-empty string"): + normalize_documents([{"id": "empty", "text": ""}], max_documents=2) + + def test_parse_top_k_defaults_clamps_and_rejects_invalid_values(self) -> None: + self.assertEqual(parse_top_k(None, document_count=3), 3) + self.assertEqual(parse_top_k(2, document_count=3), 2) + self.assertEqual(parse_top_k(99, document_count=3), 3) + for value in (0, -1, True, False, 1.5, "2", "nope"): + with self.subTest(value=value): + with self.assertRaisesRegex(ValueError, "positive integer"): + parse_top_k(value, document_count=3) + + def test_assert_port_available_detects_listener_conflict(self) -> None: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener: + listener.bind(("127.0.0.1", 0)) + listener.listen(1) + port = listener.getsockname()[1] + with self.assertRaisesRegex(RuntimeError, "cannot bind"): + assert_port_available("127.0.0.1", port) + + +if __name__ == "__main__": + unittest.main()