feat(rag): add OpenVINO NPU embeddings service
This commit is contained in:
Executable
+225
@@ -0,0 +1,225 @@
|
||||
#!/usr/bin/env python3
|
||||
"""OpenVINO GenAI embedding HTTP service for Will's local swarm stack.
|
||||
|
||||
Default port: 18817
|
||||
Default model: OpenVINO/bge-base-en-v1.5-int8-ov, cached under ~/.cache/openvino-models/
|
||||
Default device: NPU
|
||||
|
||||
Exposes a deliberately small compatibility surface:
|
||||
GET /healthz
|
||||
GET /api/tags # Ollama-ish model listing for health scripts
|
||||
POST /api/embed # Ollama-ish batched embeddings
|
||||
POST /api/embeddings # Ollama-ish single embedding
|
||||
POST /v1/embeddings # OpenAI-compatible embeddings response
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import openvino as ov
|
||||
import openvino_genai as ovg
|
||||
|
||||
DEFAULT_MODEL_NAME = "bge-base-en-v1.5-int8-ov"
|
||||
DEFAULT_MODEL_DIR = Path.home() / ".cache/openvino-models" / DEFAULT_MODEL_NAME
|
||||
DEFAULT_PORT = 18817
|
||||
NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
|
||||
|
||||
|
||||
def npu_busy_time_us() -> int | None:
|
||||
try:
|
||||
return int(NPU_BUSY_FILE.read_text().strip())
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class EmbeddingService:
|
||||
def __init__(self, model_dir: Path, model_name: str, device: str, max_length: int) -> None:
|
||||
self.model_dir = model_dir
|
||||
self.model_name = model_name
|
||||
self.device = device
|
||||
self.max_length = max_length
|
||||
self.loaded_at = time.time()
|
||||
self.lock = threading.Lock()
|
||||
self.embedding_dim: int | None = None
|
||||
|
||||
if not self.model_dir.exists():
|
||||
raise FileNotFoundError(f"model directory not found: {self.model_dir}")
|
||||
|
||||
core = ov.Core()
|
||||
self.available_devices = list(core.available_devices)
|
||||
if self.device not in self.available_devices:
|
||||
raise RuntimeError(f"OpenVINO device {self.device!r} unavailable; available={self.available_devices}")
|
||||
|
||||
# Intel NPU currently needs static shape for this embedding pipeline.
|
||||
# batch_size=1 is intentional: multi-input requests are served by looping
|
||||
# one text at a time, keeping the model shape acceptable to NPUW.
|
||||
cfg = ovg.TextEmbeddingPipeline.Config()
|
||||
cfg.max_length = int(max_length)
|
||||
cfg.pad_to_max_length = True
|
||||
cfg.batch_size = 1
|
||||
self.pipeline = ovg.TextEmbeddingPipeline(self.model_dir, self.device, cfg)
|
||||
|
||||
def embed_one(self, text: str) -> dict[str, Any]:
|
||||
text = str(text or "")
|
||||
if not text.strip():
|
||||
raise ValueError("embedding input text is empty")
|
||||
before = npu_busy_time_us()
|
||||
started = time.perf_counter()
|
||||
# TextEmbeddingPipeline is a native object; serialize calls until proven
|
||||
# safe under concurrent NPU use. Tiny silicon clown-car avoidance clause.
|
||||
with self.lock:
|
||||
vec = self.pipeline.embed_query(text)
|
||||
after = npu_busy_time_us()
|
||||
vector = [float(x) for x in vec]
|
||||
self.embedding_dim = len(vector)
|
||||
return {
|
||||
"embedding": vector,
|
||||
"dim": len(vector),
|
||||
"duration_ms": round((time.perf_counter() - started) * 1000, 3),
|
||||
"npu_busy_delta_us": None if before is None or after is None else after - before,
|
||||
}
|
||||
|
||||
def health(self) -> dict[str, Any]:
|
||||
return {
|
||||
"status": "ok",
|
||||
"service": "openvino-embeddings",
|
||||
"model": self.model_name,
|
||||
"model_dir": str(self.model_dir),
|
||||
"device": self.device,
|
||||
"available_devices": self.available_devices,
|
||||
"embedding_dim": self.embedding_dim,
|
||||
"max_length": self.max_length,
|
||||
"uptime_s": round(time.time() - self.loaded_at, 3),
|
||||
"npu_busy_time_us": npu_busy_time_us(),
|
||||
}
|
||||
|
||||
|
||||
def normalize_input(value: Any) -> list[str]:
|
||||
if isinstance(value, str):
|
||||
return [value]
|
||||
if isinstance(value, list):
|
||||
texts = [str(item) for item in value]
|
||||
if texts:
|
||||
return texts
|
||||
raise ValueError("input must be a non-empty string or list of strings")
|
||||
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
server_version = "OpenVINOEmbeddings/0.1"
|
||||
|
||||
@property
|
||||
def svc(self) -> EmbeddingService:
|
||||
return self.server.embedding_service # type: ignore[attr-defined]
|
||||
|
||||
def do_GET(self) -> None:
|
||||
path = self.path.split("?", 1)[0].rstrip("/") or "/"
|
||||
if path in {"/", "/healthz", "/readyz"}:
|
||||
self.write_json(self.svc.health())
|
||||
elif path == "/api/tags":
|
||||
self.write_json({"models": [{"name": self.svc.model_name, "model": self.svc.model_name}]})
|
||||
elif path == "/v1/models":
|
||||
self.write_json({"object": "list", "data": [{"id": self.svc.model_name, "object": "model", "owned_by": "local"}]})
|
||||
else:
|
||||
self.write_json({"error": "not found"}, status=404)
|
||||
|
||||
def do_POST(self) -> None:
|
||||
path = self.path.split("?", 1)[0].rstrip("/") or "/"
|
||||
try:
|
||||
payload = self.read_json()
|
||||
if path == "/api/embed":
|
||||
texts = normalize_input(payload.get("input"))
|
||||
results = [self.svc.embed_one(text) for text in texts]
|
||||
self.write_json({
|
||||
"model": payload.get("model") or self.svc.model_name,
|
||||
"embeddings": [item["embedding"] for item in results],
|
||||
"embedding_dim": results[0]["dim"] if results else None,
|
||||
"npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results),
|
||||
"durations_ms": [item["duration_ms"] for item in results],
|
||||
})
|
||||
elif path == "/api/embeddings":
|
||||
text = payload.get("prompt") or payload.get("input")
|
||||
result = self.svc.embed_one(str(text or ""))
|
||||
self.write_json({
|
||||
"model": payload.get("model") or self.svc.model_name,
|
||||
"embedding": result["embedding"],
|
||||
"embedding_dim": result["dim"],
|
||||
"npu_busy_delta_us": result["npu_busy_delta_us"],
|
||||
"duration_ms": result["duration_ms"],
|
||||
})
|
||||
elif path == "/v1/embeddings":
|
||||
texts = normalize_input(payload.get("input"))
|
||||
results = [self.svc.embed_one(text) for text in texts]
|
||||
self.write_json({
|
||||
"object": "list",
|
||||
"model": payload.get("model") or self.svc.model_name,
|
||||
"data": [
|
||||
{"object": "embedding", "index": idx, "embedding": item["embedding"]}
|
||||
for idx, item in enumerate(results)
|
||||
],
|
||||
"usage": {"prompt_tokens": 0, "total_tokens": 0},
|
||||
"embedding_dim": results[0]["dim"] if results else None,
|
||||
"npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results),
|
||||
"durations_ms": [item["duration_ms"] for item in results],
|
||||
})
|
||||
else:
|
||||
self.write_json({"error": "not found"}, status=404)
|
||||
except ValueError as exc:
|
||||
self.write_json({"error": str(exc)}, status=400)
|
||||
except Exception as exc:
|
||||
self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500)
|
||||
|
||||
def read_json(self) -> dict[str, Any]:
|
||||
length = int(self.headers.get("Content-Length") or 0)
|
||||
body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}"
|
||||
payload = json.loads(body or "{}")
|
||||
if not isinstance(payload, dict):
|
||||
raise ValueError("JSON body must be an object")
|
||||
return payload
|
||||
|
||||
def write_json(self, payload: dict[str, Any], status: int = 200) -> None:
|
||||
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
self.send_response(status)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name
|
||||
print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--host", default=os.environ.get("OPENVINO_EMBED_HOST", "0.0.0.0"))
|
||||
parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_EMBED_PORT", DEFAULT_PORT)))
|
||||
parser.add_argument("--model-dir", default=os.environ.get("OPENVINO_EMBED_MODEL_DIR", str(DEFAULT_MODEL_DIR)))
|
||||
parser.add_argument("--model-name", default=os.environ.get("OPENVINO_EMBED_MODEL", DEFAULT_MODEL_NAME))
|
||||
parser.add_argument("--device", default=os.environ.get("OPENVINO_EMBED_DEVICE", "NPU"))
|
||||
parser.add_argument("--max-length", type=int, default=int(os.environ.get("OPENVINO_EMBED_MAX_LENGTH", "512")))
|
||||
args = parser.parse_args()
|
||||
|
||||
service = EmbeddingService(Path(args.model_dir).expanduser(), args.model_name, args.device, args.max_length)
|
||||
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
|
||||
httpd.embedding_service = service # type: ignore[attr-defined]
|
||||
print(
|
||||
f"openvino-embeddings listening on {args.host}:{args.port} "
|
||||
f"model={args.model_name} device={args.device}",
|
||||
flush=True,
|
||||
)
|
||||
try:
|
||||
httpd.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user