#!/usr/bin/env python3 """OpenVINO GenAI embedding HTTP service for Will's local swarm stack. Default port: 18817 Default model: OpenVINO/bge-base-en-v1.5-int8-ov, cached under ~/.cache/openvino-models/ Default device: NPU Exposes a deliberately small compatibility surface: GET /healthz GET /api/tags # Ollama-ish model listing for health scripts POST /api/embed # Ollama-ish batched embeddings POST /api/embeddings # Ollama-ish single embedding POST /v1/embeddings # OpenAI-compatible embeddings response """ from __future__ import annotations import argparse import json import os import sys import threading import time from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any import openvino as ov import openvino_genai as ovg DEFAULT_MODEL_NAME = "bge-base-en-v1.5-int8-ov" DEFAULT_MODEL_DIR = Path.home() / ".cache/openvino-models" / DEFAULT_MODEL_NAME DEFAULT_PORT = 18817 NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") def npu_busy_time_us() -> int | None: try: return int(NPU_BUSY_FILE.read_text().strip()) except Exception: return None class EmbeddingService: def __init__(self, model_dir: Path, model_name: str, device: str, max_length: int) -> None: self.model_dir = model_dir self.model_name = model_name self.device = device self.max_length = max_length self.loaded_at = time.time() self.lock = threading.Lock() self.embedding_dim: int | None = None if not self.model_dir.exists(): raise FileNotFoundError(f"model directory not found: {self.model_dir}") core = ov.Core() self.available_devices = list(core.available_devices) if self.device not in self.available_devices: raise RuntimeError(f"OpenVINO device {self.device!r} unavailable; available={self.available_devices}") # Intel NPU currently needs static shape for this embedding pipeline. # batch_size=1 is intentional: multi-input requests are served by looping # one text at a time, keeping the model shape acceptable to NPUW. cfg = ovg.TextEmbeddingPipeline.Config() cfg.max_length = int(max_length) cfg.pad_to_max_length = True cfg.batch_size = 1 self.pipeline = ovg.TextEmbeddingPipeline(self.model_dir, self.device, cfg) def embed_one(self, text: str, *, purpose: str = "query") -> dict[str, Any]: text = str(text or "") if not text.strip(): raise ValueError("embedding input text is empty") if purpose not in {"query", "document"}: raise ValueError("embedding purpose must be 'query' or 'document'") before = npu_busy_time_us() started = time.perf_counter() # TextEmbeddingPipeline is a native object; serialize calls until proven # safe under concurrent NPU use. Tiny silicon clown-car avoidance clause. with self.lock: if purpose == "document": # batch_size=1 means embed_documents must receive exactly one doc. vec = self.pipeline.embed_documents([text])[0] else: vec = self.pipeline.embed_query(text) after = npu_busy_time_us() vector = [float(x) for x in vec] self.embedding_dim = len(vector) return { "embedding": vector, "dim": len(vector), "purpose": purpose, "duration_ms": round((time.perf_counter() - started) * 1000, 3), "npu_busy_delta_us": None if before is None or after is None else after - before, } def health(self) -> dict[str, Any]: return { "status": "ok", "service": "openvino-embeddings", "model": self.model_name, "model_dir": str(self.model_dir), "device": self.device, "available_devices": self.available_devices, "embedding_dim": self.embedding_dim, "max_length": self.max_length, "uptime_s": round(time.time() - self.loaded_at, 3), "npu_busy_time_us": npu_busy_time_us(), } def normalize_input(value: Any) -> list[str]: if isinstance(value, str): return [value] if isinstance(value, list): texts = [str(item) for item in value] if texts: return texts raise ValueError("input must be a non-empty string or list of strings") class Handler(BaseHTTPRequestHandler): server_version = "OpenVINOEmbeddings/0.1" @property def svc(self) -> EmbeddingService: return self.server.embedding_service # type: ignore[attr-defined] def do_GET(self) -> None: path = self.path.split("?", 1)[0].rstrip("/") or "/" if path in {"/", "/healthz", "/readyz"}: self.write_json(self.svc.health()) elif path == "/api/tags": self.write_json({"models": [{"name": self.svc.model_name, "model": self.svc.model_name}]}) elif path == "/v1/models": self.write_json({"object": "list", "data": [{"id": self.svc.model_name, "object": "model", "owned_by": "local"}]}) else: self.write_json({"error": "not found"}, status=404) def do_POST(self) -> None: path = self.path.split("?", 1)[0].rstrip("/") or "/" try: payload = self.read_json() if path == "/api/embed": texts = normalize_input(payload.get("input")) purpose = str(payload.get("purpose") or payload.get("task") or "document") results = [self.svc.embed_one(text, purpose=purpose) for text in texts] self.write_json({ "model": payload.get("model") or self.svc.model_name, "embeddings": [item["embedding"] for item in results], "embedding_dim": results[0]["dim"] if results else None, "purpose": purpose, "npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results), "durations_ms": [item["duration_ms"] for item in results], }) elif path == "/api/embeddings": text = payload.get("prompt") or payload.get("input") result = self.svc.embed_one(str(text or ""), purpose="query") self.write_json({ "model": payload.get("model") or self.svc.model_name, "embedding": result["embedding"], "embedding_dim": result["dim"], "npu_busy_delta_us": result["npu_busy_delta_us"], "duration_ms": result["duration_ms"], }) elif path == "/v1/embeddings": texts = normalize_input(payload.get("input")) purpose = str(payload.get("purpose") or payload.get("task") or "query") results = [self.svc.embed_one(text, purpose=purpose) for text in texts] self.write_json({ "object": "list", "model": payload.get("model") or self.svc.model_name, "data": [ {"object": "embedding", "index": idx, "embedding": item["embedding"]} for idx, item in enumerate(results) ], "usage": {"prompt_tokens": 0, "total_tokens": 0}, "embedding_dim": results[0]["dim"] if results else None, "purpose": purpose, "npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results), "durations_ms": [item["duration_ms"] for item in results], }) else: self.write_json({"error": "not found"}, status=404) except ValueError as exc: self.write_json({"error": str(exc)}, status=400) except Exception as exc: self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500) def read_json(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length") or 0) body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}" payload = json.loads(body or "{}") if not isinstance(payload, dict): raise ValueError("JSON body must be an object") return payload def write_json(self, payload: dict[str, Any], status: int = 200) -> None: body = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--host", default=os.environ.get("OPENVINO_EMBED_HOST", "0.0.0.0")) parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_EMBED_PORT", DEFAULT_PORT))) parser.add_argument("--model-dir", default=os.environ.get("OPENVINO_EMBED_MODEL_DIR", str(DEFAULT_MODEL_DIR))) parser.add_argument("--model-name", default=os.environ.get("OPENVINO_EMBED_MODEL", DEFAULT_MODEL_NAME)) parser.add_argument("--device", default=os.environ.get("OPENVINO_EMBED_DEVICE", "NPU")) parser.add_argument("--max-length", type=int, default=int(os.environ.get("OPENVINO_EMBED_MAX_LENGTH", "512"))) args = parser.parse_args() service = EmbeddingService(Path(args.model_dir).expanduser(), args.model_name, args.device, args.max_length) httpd = ThreadingHTTPServer((args.host, args.port), Handler) httpd.embedding_service = service # type: ignore[attr-defined] print( f"openvino-embeddings listening on {args.host}:{args.port} " f"model={args.model_name} device={args.device}", flush=True, ) try: httpd.serve_forever() except KeyboardInterrupt: pass return 0 if __name__ == "__main__": raise SystemExit(main())