feat(rag): add OpenVINO NPU embedding services

This commit is contained in:
William Valentin
2026-06-04 13:07:51 -07:00
parent 83d0ced08c
commit 0a6f84fbf3
6 changed files with 651 additions and 0 deletions
+247
View File
@@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""
Obsidian Vault Reindex Endpoint
Lightweight HTTP server that triggers incremental or full Obsidian vault reindex.
Listens on 0.0.0.0:18810 (configurable via PORT env var).
Called by n8n webhooks or systemd timers.
Endpoints:
POST /reindex -> trigger incremental reindex, returns JSON stats
POST /reindex?full=true -> trigger full semantic Chroma rebuild
GET /reindex/status -> check last index state
GET /semantic-health -> verify state plus semantic search smoke check
POST /semantic-search -> query the Obsidian Chroma semantic index
GET /healthz -> returns ok
"""
import http.server
import json
import os
import subprocess
import sys
import threading
from pathlib import Path
from urllib.parse import parse_qs, urlparse
PORT = int(os.environ.get("PORT", 18810))
REINDEX_TIMEOUT = int(os.environ.get("REINDEX_TIMEOUT", "1800"))
RAG_COLLECTION = os.environ.get("RAG_COLLECTION", "obsidian").strip() or "obsidian"
RAG_EMBED_MODEL = os.environ.get("RAG_EMBED_MODEL", "nomic-embed-text").strip() or "nomic-embed-text"
OLLAMA_BASE_URL = (os.environ.get("OLLAMA_BASE_URL") or "http://127.0.0.1:18807").rstrip("/")
REINDEX_SCRIPT = str(
Path.home()
/ ".hermes/skills/note-taking/rag-search/scripts/reindex_obsidian.sh"
)
STATE_FILE = Path(
os.environ.get("RAG_STATE_FILE")
or Path.home() / ".hermes/data/rag-search" / (
"obsidian_index_state.json" if RAG_COLLECTION == "obsidian" else f"{RAG_COLLECTION}_index_state.json"
)
).expanduser()
SEARCH_SCRIPT = str(Path.home() / ".hermes/skills/note-taking/rag-search/scripts/search.py")
VENV_PYTHON = str(Path.home() / ".hermes/skills/note-taking/rag-search/venv/bin/python")
# Lock to prevent concurrent reindexing
_reindex_lock = threading.Lock()
def run_reindex(full: bool = False) -> dict:
"""Run the reindex script. Returns stats dict."""
if not _reindex_lock.acquire(blocking=False):
return {"error": "reindex already in progress", "status": "locked"}
try:
cmd = [REINDEX_SCRIPT]
if full:
cmd.append("--full")
env = os.environ.copy()
env.setdefault("RAG_COLLECTION", RAG_COLLECTION)
env.setdefault("RAG_EMBED_MODEL", RAG_EMBED_MODEL)
env.setdefault("OLLAMA_BASE_URL", OLLAMA_BASE_URL)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=REINDEX_TIMEOUT,
env=env,
)
if result.returncode != 0:
return {
"error": "reindex failed",
"exit_code": result.returncode,
"stderr": result.stderr.strip()[-2000:],
}
try:
payload = json.loads(result.stdout)
if result.stderr.strip():
payload["progress_log_tail"] = result.stderr.strip()[-2000:]
return payload
except json.JSONDecodeError:
return {
"error": "invalid json output",
"stdout": result.stdout.strip()[:500],
"stderr": result.stderr.strip()[-2000:],
}
except subprocess.TimeoutExpired:
return {"error": f"reindex timed out ({REINDEX_TIMEOUT}s)"}
except Exception as e:
return {"error": str(e)}
finally:
_reindex_lock.release()
def get_status() -> dict:
"""Read the last index state file."""
if not STATE_FILE.exists():
return {"indexed": False, "message": "no state file"}
try:
return json.loads(STATE_FILE.read_text())
except (json.JSONDecodeError, IOError) as e:
return {"error": str(e)}
def run_semantic_search(query: str, top_k: int = 5) -> dict:
"""Query the local Obsidian Chroma index via the rag-search script."""
query = (query or "").strip()
if not query:
return {"ok": False, "error": "query is required", "results": []}
top_k = max(1, min(int(top_k or 5), 20))
env = os.environ.copy()
env.setdefault("RAG_COLLECTION", RAG_COLLECTION)
env.setdefault("RAG_EMBED_MODEL", RAG_EMBED_MODEL)
env.setdefault("OLLAMA_BASE_URL", OLLAMA_BASE_URL)
result = subprocess.run(
[
VENV_PYTHON if Path(VENV_PYTHON).exists() else sys.executable,
SEARCH_SCRIPT,
"--index",
RAG_COLLECTION,
"--top-k",
str(top_k),
"--raw",
query,
],
capture_output=True,
text=True,
timeout=90,
env=env,
)
if result.returncode != 0:
return {
"ok": False,
"query": query,
"top_k": top_k,
"error": result.stderr.strip()[-2000:] or result.stdout.strip()[-2000:],
"results": [],
}
payload = json.loads(result.stdout)
results = payload.get("results") or []
return {
"ok": True,
"query": query,
"index": payload.get("index", RAG_COLLECTION),
"top_k": top_k,
"result_count": len(results),
"results": results,
}
def semantic_health() -> dict:
"""Return state plus a tiny semantic-search smoke check."""
status = get_status()
health = {
"status": "ok" if status.get("status") == "ok" and status.get("vector_count", 0) > 0 else "degraded",
"state": {
k: status.get(k)
for k in (
"status",
"note_count",
"vector_count",
"collection",
"embedding_backend",
"embedding_model",
"last_full_index",
"last_incremental_index",
)
},
}
try:
payload = run_semantic_search("Obsidian reindex", top_k=1)
health["search_ok"] = bool(payload.get("results"))
health["result_count"] = len(payload.get("results", []))
if not payload.get("ok"):
health["search_error"] = payload.get("error")
except Exception as e:
health["status"] = "degraded"
health["search_ok"] = False
health["search_error"] = str(e)
if not health.get("search_ok"):
health["status"] = "degraded"
return health
class ReindexHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
path = urlparse(self.path).path.rstrip("/")
if path == "/healthz":
self._json_response({"status": "ok"})
elif path == "/reindex/status":
self._json_response(get_status())
elif path in ("/semantic-health", "/reindex/semantic-health"):
data = semantic_health()
self._json_response(data, status=200 if data.get("status") == "ok" else 503)
else:
self._json_response({"error": "not found"}, status=404)
def do_POST(self):
parsed = urlparse(self.path)
path = parsed.path.rstrip("/")
if path == "/reindex":
params = parse_qs(parsed.query)
full = (params.get("full") or [""])[0].lower() in {"1", "true", "yes"}
result = run_reindex(full=full)
status = 200 if "error" not in result else 500
self._json_response(result, status=status)
elif path == "/semantic-search":
try:
length = int(self.headers.get("Content-Length") or 0)
body = self.rfile.read(length).decode("utf-8") if length else "{}"
payload = json.loads(body or "{}")
query = payload.get("query") or payload.get("q") or ""
top_k = payload.get("top_k") or payload.get("topK") or 5
result = run_semantic_search(str(query), int(top_k))
self._json_response(result, status=200 if result.get("ok") else 400)
except json.JSONDecodeError:
self._json_response({"ok": False, "error": "invalid json", "results": []}, status=400)
except Exception as exc:
self._json_response({"ok": False, "error": str(exc), "results": []}, status=500)
else:
self._json_response({"error": "not found"}, status=404)
def _json_response(self, data, status=200):
body = json.dumps(data, indent=2).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
# Minimal logging
pass
def main():
server = http.server.HTTPServer(("0.0.0.0", PORT), ReindexHandler)
print(f"obsidian-reindex-server listening on 0.0.0.0:{PORT}", flush=True)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()
if __name__ == "__main__":
main()
+236
View File
@@ -0,0 +1,236 @@
#!/usr/bin/env python3
"""OpenVINO GenAI embedding HTTP service for Will's local swarm stack.
Default port: 18817
Default model: OpenVINO/bge-base-en-v1.5-int8-ov, cached under ~/.cache/openvino-models/
Default device: NPU
Exposes a deliberately small compatibility surface:
GET /healthz
GET /api/tags # Ollama-ish model listing for health scripts
POST /api/embed # Ollama-ish batched embeddings
POST /api/embeddings # Ollama-ish single embedding
POST /v1/embeddings # OpenAI-compatible embeddings response
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
import openvino as ov
import openvino_genai as ovg
DEFAULT_MODEL_NAME = "bge-base-en-v1.5-int8-ov"
DEFAULT_MODEL_DIR = Path.home() / ".cache/openvino-models" / DEFAULT_MODEL_NAME
DEFAULT_PORT = 18817
NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
def npu_busy_time_us() -> int | None:
try:
return int(NPU_BUSY_FILE.read_text().strip())
except Exception:
return None
class EmbeddingService:
def __init__(self, model_dir: Path, model_name: str, device: str, max_length: int) -> None:
self.model_dir = model_dir
self.model_name = model_name
self.device = device
self.max_length = max_length
self.loaded_at = time.time()
self.lock = threading.Lock()
self.embedding_dim: int | None = None
if not self.model_dir.exists():
raise FileNotFoundError(f"model directory not found: {self.model_dir}")
core = ov.Core()
self.available_devices = list(core.available_devices)
if self.device not in self.available_devices:
raise RuntimeError(f"OpenVINO device {self.device!r} unavailable; available={self.available_devices}")
# Intel NPU currently needs static shape for this embedding pipeline.
# batch_size=1 is intentional: multi-input requests are served by looping
# one text at a time, keeping the model shape acceptable to NPUW.
cfg = ovg.TextEmbeddingPipeline.Config()
cfg.max_length = int(max_length)
cfg.pad_to_max_length = True
cfg.batch_size = 1
self.pipeline = ovg.TextEmbeddingPipeline(self.model_dir, self.device, cfg)
def embed_one(self, text: str, *, purpose: str = "query") -> dict[str, Any]:
text = str(text or "")
if not text.strip():
raise ValueError("embedding input text is empty")
if purpose not in {"query", "document"}:
raise ValueError("embedding purpose must be 'query' or 'document'")
before = npu_busy_time_us()
started = time.perf_counter()
# TextEmbeddingPipeline is a native object; serialize calls until proven
# safe under concurrent NPU use. Tiny silicon clown-car avoidance clause.
with self.lock:
if purpose == "document":
# batch_size=1 means embed_documents must receive exactly one doc.
vec = self.pipeline.embed_documents([text])[0]
else:
vec = self.pipeline.embed_query(text)
after = npu_busy_time_us()
vector = [float(x) for x in vec]
self.embedding_dim = len(vector)
return {
"embedding": vector,
"dim": len(vector),
"purpose": purpose,
"duration_ms": round((time.perf_counter() - started) * 1000, 3),
"npu_busy_delta_us": None if before is None or after is None else after - before,
}
def health(self) -> dict[str, Any]:
return {
"status": "ok",
"service": "openvino-embeddings",
"model": self.model_name,
"model_dir": str(self.model_dir),
"device": self.device,
"available_devices": self.available_devices,
"embedding_dim": self.embedding_dim,
"max_length": self.max_length,
"uptime_s": round(time.time() - self.loaded_at, 3),
"npu_busy_time_us": npu_busy_time_us(),
}
def normalize_input(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
texts = [str(item) for item in value]
if texts:
return texts
raise ValueError("input must be a non-empty string or list of strings")
class Handler(BaseHTTPRequestHandler):
server_version = "OpenVINOEmbeddings/0.1"
@property
def svc(self) -> EmbeddingService:
return self.server.embedding_service # type: ignore[attr-defined]
def do_GET(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
if path in {"/", "/healthz", "/readyz"}:
self.write_json(self.svc.health())
elif path == "/api/tags":
self.write_json({"models": [{"name": self.svc.model_name, "model": self.svc.model_name}]})
elif path == "/v1/models":
self.write_json({"object": "list", "data": [{"id": self.svc.model_name, "object": "model", "owned_by": "local"}]})
else:
self.write_json({"error": "not found"}, status=404)
def do_POST(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
try:
payload = self.read_json()
if path == "/api/embed":
texts = normalize_input(payload.get("input"))
purpose = str(payload.get("purpose") or payload.get("task") or "document")
results = [self.svc.embed_one(text, purpose=purpose) for text in texts]
self.write_json({
"model": payload.get("model") or self.svc.model_name,
"embeddings": [item["embedding"] for item in results],
"embedding_dim": results[0]["dim"] if results else None,
"purpose": purpose,
"npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results),
"durations_ms": [item["duration_ms"] for item in results],
})
elif path == "/api/embeddings":
text = payload.get("prompt") or payload.get("input")
result = self.svc.embed_one(str(text or ""), purpose="query")
self.write_json({
"model": payload.get("model") or self.svc.model_name,
"embedding": result["embedding"],
"embedding_dim": result["dim"],
"npu_busy_delta_us": result["npu_busy_delta_us"],
"duration_ms": result["duration_ms"],
})
elif path == "/v1/embeddings":
texts = normalize_input(payload.get("input"))
purpose = str(payload.get("purpose") or payload.get("task") or "query")
results = [self.svc.embed_one(text, purpose=purpose) for text in texts]
self.write_json({
"object": "list",
"model": payload.get("model") or self.svc.model_name,
"data": [
{"object": "embedding", "index": idx, "embedding": item["embedding"]}
for idx, item in enumerate(results)
],
"usage": {"prompt_tokens": 0, "total_tokens": 0},
"embedding_dim": results[0]["dim"] if results else None,
"purpose": purpose,
"npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results),
"durations_ms": [item["duration_ms"] for item in results],
})
else:
self.write_json({"error": "not found"}, status=404)
except ValueError as exc:
self.write_json({"error": str(exc)}, status=400)
except Exception as exc:
self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500)
def read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length") or 0)
body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}"
payload = json.loads(body or "{}")
if not isinstance(payload, dict):
raise ValueError("JSON body must be an object")
return payload
def write_json(self, payload: dict[str, Any], status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name
print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=os.environ.get("OPENVINO_EMBED_HOST", "0.0.0.0"))
parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_EMBED_PORT", DEFAULT_PORT)))
parser.add_argument("--model-dir", default=os.environ.get("OPENVINO_EMBED_MODEL_DIR", str(DEFAULT_MODEL_DIR)))
parser.add_argument("--model-name", default=os.environ.get("OPENVINO_EMBED_MODEL", DEFAULT_MODEL_NAME))
parser.add_argument("--device", default=os.environ.get("OPENVINO_EMBED_DEVICE", "NPU"))
parser.add_argument("--max-length", type=int, default=int(os.environ.get("OPENVINO_EMBED_MAX_LENGTH", "512")))
args = parser.parse_args()
service = EmbeddingService(Path(args.model_dir).expanduser(), args.model_name, args.device, args.max_length)
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.embedding_service = service # type: ignore[attr-defined]
print(
f"openvino-embeddings listening on {args.host}:{args.port} "
f"model={args.model_name} device={args.device}",
flush=True,
)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())
+117
View File
@@ -0,0 +1,117 @@
#!/usr/bin/env python3
"""RAG/embedding health HTTP wrapper for n8n.
Listens on 0.0.0.0:18814 so the n8n container can call it via
http://172.19.0.1:18814.
Endpoints:
GET /healthz -> service liveness
POST /check -> run ~/.hermes/scripts/rag_embedding_health.py and return JSON
"""
from __future__ import annotations
import http.server
import json
import os
import subprocess
import time
from pathlib import Path
PORT = int(os.environ.get("PORT", "18814"))
CHECK_SCRIPT = Path(os.environ.get("RAG_HEALTH_SCRIPT", "/home/will/.hermes/scripts/rag_embedding_health.py"))
TIMEOUT = int(os.environ.get("RAG_HEALTH_TIMEOUT", "180"))
class Handler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path.rstrip("/") == "/healthz":
self._json({"status": "ok", "service": "rag-embedding-health"})
else:
self._json({"error": "not found"}, status=404)
def do_POST(self):
if self.path.rstrip("/") != "/check":
self._json({"error": "not found"}, status=404)
return
started = time.time()
if not CHECK_SCRIPT.exists():
self._json(
{
"ok": False,
"status": "failed",
"exitCode": 127,
"output": f"RAG health script missing: {CHECK_SCRIPT}",
"durationMs": 0,
},
status=200,
)
return
env = os.environ.copy()
env.setdefault("HERMES_HOME", "/home/will/.hermes")
env.setdefault("OLLAMA_BASE_URL", "http://127.0.0.1:18817")
env.setdefault("RAG_EMBED_MODEL", "bge-base-en-v1.5-int8-ov")
env.setdefault("N8N_URL", "http://127.0.0.1:18808")
env.setdefault("OBSIDIAN_REINDEX_URL", "http://127.0.0.1:18810")
try:
proc = subprocess.run(
[str(CHECK_SCRIPT)],
text=True,
capture_output=True,
timeout=TIMEOUT,
check=False,
env=env,
)
output = (proc.stdout or proc.stderr or "").strip()
self._json(
{
"ok": proc.returncode == 0,
"status": "ok" if proc.returncode == 0 else "failed",
"exitCode": proc.returncode,
"output": output[:4000],
"durationMs": int((time.time() - started) * 1000),
},
status=200,
)
except subprocess.TimeoutExpired:
self._json(
{
"ok": False,
"status": "timeout",
"exitCode": 124,
"output": f"RAG/embedding health check timed out after {TIMEOUT}s",
"durationMs": int((time.time() - started) * 1000),
},
status=200,
)
except Exception as exc:
self._json(
{
"ok": False,
"status": "error",
"exitCode": 1,
"output": str(exc)[:4000],
"durationMs": int((time.time() - started) * 1000),
},
status=200,
)
def _json(self, data, status=200):
body = json.dumps(data, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
return
if __name__ == "__main__":
server = http.server.HTTPServer(("0.0.0.0", PORT), Handler)
print(f"rag-embedding-health listening on 0.0.0.0:{PORT}", flush=True)
server.serve_forever()
@@ -0,0 +1,16 @@
[Unit]
Description=Obsidian Vault Reindex Endpoint
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /home/will/lab/swarm/scripts/obsidian-reindex-server.py
Restart=on-failure
RestartSec=5
Environment=PORT=18810
Environment=RAG_COLLECTION=obsidian_bge_npu
Environment=RAG_EMBED_MODEL=bge-base-en-v1.5-int8-ov
Environment=OLLAMA_BASE_URL=http://127.0.0.1:18817
[Install]
WantedBy=default.target
+19
View File
@@ -0,0 +1,19 @@
[Unit]
Description=OpenVINO NPU Embeddings HTTP Service (port 18817)
After=network.target
[Service]
Type=simple
WorkingDirectory=/home/will/lab/swarm
ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/scripts/openvino-embeddings-server.py
Restart=on-failure
RestartSec=5
Environment=OPENVINO_EMBED_PORT=18817
Environment=OPENVINO_EMBED_HOST=0.0.0.0
Environment=OPENVINO_EMBED_DEVICE=NPU
Environment=OPENVINO_EMBED_MODEL=bge-base-en-v1.5-int8-ov
Environment=OPENVINO_EMBED_MODEL_DIR=/home/will/.cache/openvino-models/bge-base-en-v1.5-int8-ov
Environment=OPENVINO_EMBED_MAX_LENGTH=512
[Install]
WantedBy=default.target
+16
View File
@@ -0,0 +1,16 @@
[Unit]
Description=RAG/Embedding Health HTTP Service (port 18814)
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /home/will/lab/swarm/scripts/rag-embedding-health-server.py
Restart=on-failure
RestartSec=5
Environment=PORT=18814
Environment=RAG_HEALTH_TIMEOUT=180
Environment=OLLAMA_BASE_URL=http://127.0.0.1:18817
Environment=RAG_EMBED_MODEL=bge-base-en-v1.5-int8-ov
[Install]
WantedBy=default.target