diff --git a/scripts/obsidian-reindex-server.py b/scripts/obsidian-reindex-server.py new file mode 100644 index 0000000..6133b8c --- /dev/null +++ b/scripts/obsidian-reindex-server.py @@ -0,0 +1,247 @@ +#!/usr/bin/env python3 +""" +Obsidian Vault Reindex Endpoint +Lightweight HTTP server that triggers incremental or full Obsidian vault reindex. + +Listens on 0.0.0.0:18810 (configurable via PORT env var). +Called by n8n webhooks or systemd timers. + +Endpoints: + POST /reindex -> trigger incremental reindex, returns JSON stats + POST /reindex?full=true -> trigger full semantic Chroma rebuild + GET /reindex/status -> check last index state + GET /semantic-health -> verify state plus semantic search smoke check + POST /semantic-search -> query the Obsidian Chroma semantic index + GET /healthz -> returns ok +""" + +import http.server +import json +import os +import subprocess +import sys +import threading +from pathlib import Path +from urllib.parse import parse_qs, urlparse + +PORT = int(os.environ.get("PORT", 18810)) +REINDEX_TIMEOUT = int(os.environ.get("REINDEX_TIMEOUT", "1800")) +RAG_COLLECTION = os.environ.get("RAG_COLLECTION", "obsidian").strip() or "obsidian" +RAG_EMBED_MODEL = os.environ.get("RAG_EMBED_MODEL", "nomic-embed-text").strip() or "nomic-embed-text" +OLLAMA_BASE_URL = (os.environ.get("OLLAMA_BASE_URL") or "http://127.0.0.1:18807").rstrip("/") + +REINDEX_SCRIPT = str( + Path.home() + / ".hermes/skills/note-taking/rag-search/scripts/reindex_obsidian.sh" +) +STATE_FILE = Path( + os.environ.get("RAG_STATE_FILE") + or Path.home() / ".hermes/data/rag-search" / ( + "obsidian_index_state.json" if RAG_COLLECTION == "obsidian" else f"{RAG_COLLECTION}_index_state.json" + ) +).expanduser() +SEARCH_SCRIPT = str(Path.home() / ".hermes/skills/note-taking/rag-search/scripts/search.py") +VENV_PYTHON = str(Path.home() / ".hermes/skills/note-taking/rag-search/venv/bin/python") + +# Lock to prevent concurrent reindexing +_reindex_lock = threading.Lock() + + +def run_reindex(full: bool = False) -> dict: + """Run the reindex script. Returns stats dict.""" + if not _reindex_lock.acquire(blocking=False): + return {"error": "reindex already in progress", "status": "locked"} + + try: + cmd = [REINDEX_SCRIPT] + if full: + cmd.append("--full") + env = os.environ.copy() + env.setdefault("RAG_COLLECTION", RAG_COLLECTION) + env.setdefault("RAG_EMBED_MODEL", RAG_EMBED_MODEL) + env.setdefault("OLLAMA_BASE_URL", OLLAMA_BASE_URL) + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=REINDEX_TIMEOUT, + env=env, + ) + if result.returncode != 0: + return { + "error": "reindex failed", + "exit_code": result.returncode, + "stderr": result.stderr.strip()[-2000:], + } + try: + payload = json.loads(result.stdout) + if result.stderr.strip(): + payload["progress_log_tail"] = result.stderr.strip()[-2000:] + return payload + except json.JSONDecodeError: + return { + "error": "invalid json output", + "stdout": result.stdout.strip()[:500], + "stderr": result.stderr.strip()[-2000:], + } + except subprocess.TimeoutExpired: + return {"error": f"reindex timed out ({REINDEX_TIMEOUT}s)"} + except Exception as e: + return {"error": str(e)} + finally: + _reindex_lock.release() + + +def get_status() -> dict: + """Read the last index state file.""" + if not STATE_FILE.exists(): + return {"indexed": False, "message": "no state file"} + try: + return json.loads(STATE_FILE.read_text()) + except (json.JSONDecodeError, IOError) as e: + return {"error": str(e)} + + +def run_semantic_search(query: str, top_k: int = 5) -> dict: + """Query the local Obsidian Chroma index via the rag-search script.""" + query = (query or "").strip() + if not query: + return {"ok": False, "error": "query is required", "results": []} + top_k = max(1, min(int(top_k or 5), 20)) + env = os.environ.copy() + env.setdefault("RAG_COLLECTION", RAG_COLLECTION) + env.setdefault("RAG_EMBED_MODEL", RAG_EMBED_MODEL) + env.setdefault("OLLAMA_BASE_URL", OLLAMA_BASE_URL) + result = subprocess.run( + [ + VENV_PYTHON if Path(VENV_PYTHON).exists() else sys.executable, + SEARCH_SCRIPT, + "--index", + RAG_COLLECTION, + "--top-k", + str(top_k), + "--raw", + query, + ], + capture_output=True, + text=True, + timeout=90, + env=env, + ) + if result.returncode != 0: + return { + "ok": False, + "query": query, + "top_k": top_k, + "error": result.stderr.strip()[-2000:] or result.stdout.strip()[-2000:], + "results": [], + } + payload = json.loads(result.stdout) + results = payload.get("results") or [] + return { + "ok": True, + "query": query, + "index": payload.get("index", RAG_COLLECTION), + "top_k": top_k, + "result_count": len(results), + "results": results, + } + + +def semantic_health() -> dict: + """Return state plus a tiny semantic-search smoke check.""" + status = get_status() + health = { + "status": "ok" if status.get("status") == "ok" and status.get("vector_count", 0) > 0 else "degraded", + "state": { + k: status.get(k) + for k in ( + "status", + "note_count", + "vector_count", + "collection", + "embedding_backend", + "embedding_model", + "last_full_index", + "last_incremental_index", + ) + }, + } + try: + payload = run_semantic_search("Obsidian reindex", top_k=1) + health["search_ok"] = bool(payload.get("results")) + health["result_count"] = len(payload.get("results", [])) + if not payload.get("ok"): + health["search_error"] = payload.get("error") + except Exception as e: + health["status"] = "degraded" + health["search_ok"] = False + health["search_error"] = str(e) + if not health.get("search_ok"): + health["status"] = "degraded" + return health + + +class ReindexHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + path = urlparse(self.path).path.rstrip("/") + if path == "/healthz": + self._json_response({"status": "ok"}) + elif path == "/reindex/status": + self._json_response(get_status()) + elif path in ("/semantic-health", "/reindex/semantic-health"): + data = semantic_health() + self._json_response(data, status=200 if data.get("status") == "ok" else 503) + else: + self._json_response({"error": "not found"}, status=404) + + def do_POST(self): + parsed = urlparse(self.path) + path = parsed.path.rstrip("/") + if path == "/reindex": + params = parse_qs(parsed.query) + full = (params.get("full") or [""])[0].lower() in {"1", "true", "yes"} + result = run_reindex(full=full) + status = 200 if "error" not in result else 500 + self._json_response(result, status=status) + elif path == "/semantic-search": + try: + length = int(self.headers.get("Content-Length") or 0) + body = self.rfile.read(length).decode("utf-8") if length else "{}" + payload = json.loads(body or "{}") + query = payload.get("query") or payload.get("q") or "" + top_k = payload.get("top_k") or payload.get("topK") or 5 + result = run_semantic_search(str(query), int(top_k)) + self._json_response(result, status=200 if result.get("ok") else 400) + except json.JSONDecodeError: + self._json_response({"ok": False, "error": "invalid json", "results": []}, status=400) + except Exception as exc: + self._json_response({"ok": False, "error": str(exc), "results": []}, status=500) + else: + self._json_response({"error": "not found"}, status=404) + + def _json_response(self, data, status=200): + body = json.dumps(data, indent=2).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + # Minimal logging + pass + + +def main(): + server = http.server.HTTPServer(("0.0.0.0", PORT), ReindexHandler) + print(f"obsidian-reindex-server listening on 0.0.0.0:{PORT}", flush=True) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/scripts/openvino-embeddings-server.py b/scripts/openvino-embeddings-server.py new file mode 100755 index 0000000..e902b8e --- /dev/null +++ b/scripts/openvino-embeddings-server.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +"""OpenVINO GenAI embedding HTTP service for Will's local swarm stack. + +Default port: 18817 +Default model: OpenVINO/bge-base-en-v1.5-int8-ov, cached under ~/.cache/openvino-models/ +Default device: NPU + +Exposes a deliberately small compatibility surface: + GET /healthz + GET /api/tags # Ollama-ish model listing for health scripts + POST /api/embed # Ollama-ish batched embeddings + POST /api/embeddings # Ollama-ish single embedding + POST /v1/embeddings # OpenAI-compatible embeddings response +""" +from __future__ import annotations + +import argparse +import json +import os +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + +import openvino as ov +import openvino_genai as ovg + +DEFAULT_MODEL_NAME = "bge-base-en-v1.5-int8-ov" +DEFAULT_MODEL_DIR = Path.home() / ".cache/openvino-models" / DEFAULT_MODEL_NAME +DEFAULT_PORT = 18817 +NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") + + +def npu_busy_time_us() -> int | None: + try: + return int(NPU_BUSY_FILE.read_text().strip()) + except Exception: + return None + + +class EmbeddingService: + def __init__(self, model_dir: Path, model_name: str, device: str, max_length: int) -> None: + self.model_dir = model_dir + self.model_name = model_name + self.device = device + self.max_length = max_length + self.loaded_at = time.time() + self.lock = threading.Lock() + self.embedding_dim: int | None = None + + if not self.model_dir.exists(): + raise FileNotFoundError(f"model directory not found: {self.model_dir}") + + core = ov.Core() + self.available_devices = list(core.available_devices) + if self.device not in self.available_devices: + raise RuntimeError(f"OpenVINO device {self.device!r} unavailable; available={self.available_devices}") + + # Intel NPU currently needs static shape for this embedding pipeline. + # batch_size=1 is intentional: multi-input requests are served by looping + # one text at a time, keeping the model shape acceptable to NPUW. + cfg = ovg.TextEmbeddingPipeline.Config() + cfg.max_length = int(max_length) + cfg.pad_to_max_length = True + cfg.batch_size = 1 + self.pipeline = ovg.TextEmbeddingPipeline(self.model_dir, self.device, cfg) + + def embed_one(self, text: str, *, purpose: str = "query") -> dict[str, Any]: + text = str(text or "") + if not text.strip(): + raise ValueError("embedding input text is empty") + if purpose not in {"query", "document"}: + raise ValueError("embedding purpose must be 'query' or 'document'") + before = npu_busy_time_us() + started = time.perf_counter() + # TextEmbeddingPipeline is a native object; serialize calls until proven + # safe under concurrent NPU use. Tiny silicon clown-car avoidance clause. + with self.lock: + if purpose == "document": + # batch_size=1 means embed_documents must receive exactly one doc. + vec = self.pipeline.embed_documents([text])[0] + else: + vec = self.pipeline.embed_query(text) + after = npu_busy_time_us() + vector = [float(x) for x in vec] + self.embedding_dim = len(vector) + return { + "embedding": vector, + "dim": len(vector), + "purpose": purpose, + "duration_ms": round((time.perf_counter() - started) * 1000, 3), + "npu_busy_delta_us": None if before is None or after is None else after - before, + } + + def health(self) -> dict[str, Any]: + return { + "status": "ok", + "service": "openvino-embeddings", + "model": self.model_name, + "model_dir": str(self.model_dir), + "device": self.device, + "available_devices": self.available_devices, + "embedding_dim": self.embedding_dim, + "max_length": self.max_length, + "uptime_s": round(time.time() - self.loaded_at, 3), + "npu_busy_time_us": npu_busy_time_us(), + } + + +def normalize_input(value: Any) -> list[str]: + if isinstance(value, str): + return [value] + if isinstance(value, list): + texts = [str(item) for item in value] + if texts: + return texts + raise ValueError("input must be a non-empty string or list of strings") + + +class Handler(BaseHTTPRequestHandler): + server_version = "OpenVINOEmbeddings/0.1" + + @property + def svc(self) -> EmbeddingService: + return self.server.embedding_service # type: ignore[attr-defined] + + def do_GET(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + if path in {"/", "/healthz", "/readyz"}: + self.write_json(self.svc.health()) + elif path == "/api/tags": + self.write_json({"models": [{"name": self.svc.model_name, "model": self.svc.model_name}]}) + elif path == "/v1/models": + self.write_json({"object": "list", "data": [{"id": self.svc.model_name, "object": "model", "owned_by": "local"}]}) + else: + self.write_json({"error": "not found"}, status=404) + + def do_POST(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + try: + payload = self.read_json() + if path == "/api/embed": + texts = normalize_input(payload.get("input")) + purpose = str(payload.get("purpose") or payload.get("task") or "document") + results = [self.svc.embed_one(text, purpose=purpose) for text in texts] + self.write_json({ + "model": payload.get("model") or self.svc.model_name, + "embeddings": [item["embedding"] for item in results], + "embedding_dim": results[0]["dim"] if results else None, + "purpose": purpose, + "npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results), + "durations_ms": [item["duration_ms"] for item in results], + }) + elif path == "/api/embeddings": + text = payload.get("prompt") or payload.get("input") + result = self.svc.embed_one(str(text or ""), purpose="query") + self.write_json({ + "model": payload.get("model") or self.svc.model_name, + "embedding": result["embedding"], + "embedding_dim": result["dim"], + "npu_busy_delta_us": result["npu_busy_delta_us"], + "duration_ms": result["duration_ms"], + }) + elif path == "/v1/embeddings": + texts = normalize_input(payload.get("input")) + purpose = str(payload.get("purpose") or payload.get("task") or "query") + results = [self.svc.embed_one(text, purpose=purpose) for text in texts] + self.write_json({ + "object": "list", + "model": payload.get("model") or self.svc.model_name, + "data": [ + {"object": "embedding", "index": idx, "embedding": item["embedding"]} + for idx, item in enumerate(results) + ], + "usage": {"prompt_tokens": 0, "total_tokens": 0}, + "embedding_dim": results[0]["dim"] if results else None, + "purpose": purpose, + "npu_busy_delta_us": sum((item.get("npu_busy_delta_us") or 0) for item in results), + "durations_ms": [item["duration_ms"] for item in results], + }) + else: + self.write_json({"error": "not found"}, status=404) + except ValueError as exc: + self.write_json({"error": str(exc)}, status=400) + except Exception as exc: + self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500) + + def read_json(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length") or 0) + body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}" + payload = json.loads(body or "{}") + if not isinstance(payload, dict): + raise ValueError("JSON body must be an object") + return payload + + def write_json(self, payload: dict[str, Any], status: int = 200) -> None: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name + print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--host", default=os.environ.get("OPENVINO_EMBED_HOST", "0.0.0.0")) + parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_EMBED_PORT", DEFAULT_PORT))) + parser.add_argument("--model-dir", default=os.environ.get("OPENVINO_EMBED_MODEL_DIR", str(DEFAULT_MODEL_DIR))) + parser.add_argument("--model-name", default=os.environ.get("OPENVINO_EMBED_MODEL", DEFAULT_MODEL_NAME)) + parser.add_argument("--device", default=os.environ.get("OPENVINO_EMBED_DEVICE", "NPU")) + parser.add_argument("--max-length", type=int, default=int(os.environ.get("OPENVINO_EMBED_MAX_LENGTH", "512"))) + args = parser.parse_args() + + service = EmbeddingService(Path(args.model_dir).expanduser(), args.model_name, args.device, args.max_length) + httpd = ThreadingHTTPServer((args.host, args.port), Handler) + httpd.embedding_service = service # type: ignore[attr-defined] + print( + f"openvino-embeddings listening on {args.host}:{args.port} " + f"model={args.model_name} device={args.device}", + flush=True, + ) + try: + httpd.serve_forever() + except KeyboardInterrupt: + pass + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/rag-embedding-health-server.py b/scripts/rag-embedding-health-server.py new file mode 100644 index 0000000..24d60b4 --- /dev/null +++ b/scripts/rag-embedding-health-server.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python3 +"""RAG/embedding health HTTP wrapper for n8n. + +Listens on 0.0.0.0:18814 so the n8n container can call it via +http://172.19.0.1:18814. + +Endpoints: + GET /healthz -> service liveness + POST /check -> run ~/.hermes/scripts/rag_embedding_health.py and return JSON +""" + +from __future__ import annotations + +import http.server +import json +import os +import subprocess +import time +from pathlib import Path + +PORT = int(os.environ.get("PORT", "18814")) +CHECK_SCRIPT = Path(os.environ.get("RAG_HEALTH_SCRIPT", "/home/will/.hermes/scripts/rag_embedding_health.py")) +TIMEOUT = int(os.environ.get("RAG_HEALTH_TIMEOUT", "180")) + + +class Handler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + if self.path.rstrip("/") == "/healthz": + self._json({"status": "ok", "service": "rag-embedding-health"}) + else: + self._json({"error": "not found"}, status=404) + + def do_POST(self): + if self.path.rstrip("/") != "/check": + self._json({"error": "not found"}, status=404) + return + + started = time.time() + if not CHECK_SCRIPT.exists(): + self._json( + { + "ok": False, + "status": "failed", + "exitCode": 127, + "output": f"RAG health script missing: {CHECK_SCRIPT}", + "durationMs": 0, + }, + status=200, + ) + return + + env = os.environ.copy() + env.setdefault("HERMES_HOME", "/home/will/.hermes") + env.setdefault("OLLAMA_BASE_URL", "http://127.0.0.1:18817") + env.setdefault("RAG_EMBED_MODEL", "bge-base-en-v1.5-int8-ov") + env.setdefault("N8N_URL", "http://127.0.0.1:18808") + env.setdefault("OBSIDIAN_REINDEX_URL", "http://127.0.0.1:18810") + + try: + proc = subprocess.run( + [str(CHECK_SCRIPT)], + text=True, + capture_output=True, + timeout=TIMEOUT, + check=False, + env=env, + ) + output = (proc.stdout or proc.stderr or "").strip() + self._json( + { + "ok": proc.returncode == 0, + "status": "ok" if proc.returncode == 0 else "failed", + "exitCode": proc.returncode, + "output": output[:4000], + "durationMs": int((time.time() - started) * 1000), + }, + status=200, + ) + except subprocess.TimeoutExpired: + self._json( + { + "ok": False, + "status": "timeout", + "exitCode": 124, + "output": f"RAG/embedding health check timed out after {TIMEOUT}s", + "durationMs": int((time.time() - started) * 1000), + }, + status=200, + ) + except Exception as exc: + self._json( + { + "ok": False, + "status": "error", + "exitCode": 1, + "output": str(exc)[:4000], + "durationMs": int((time.time() - started) * 1000), + }, + status=200, + ) + + def _json(self, data, status=200): + body = json.dumps(data, indent=2).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + return + + +if __name__ == "__main__": + server = http.server.HTTPServer(("0.0.0.0", PORT), Handler) + print(f"rag-embedding-health listening on 0.0.0.0:{PORT}", flush=True) + server.serve_forever() diff --git a/swarm-common/obsidian-reindex-endpoint.service b/swarm-common/obsidian-reindex-endpoint.service new file mode 100644 index 0000000..4928f23 --- /dev/null +++ b/swarm-common/obsidian-reindex-endpoint.service @@ -0,0 +1,16 @@ +[Unit] +Description=Obsidian Vault Reindex Endpoint +After=network.target + +[Service] +Type=simple +ExecStart=/usr/bin/python3 /home/will/lab/swarm/scripts/obsidian-reindex-server.py +Restart=on-failure +RestartSec=5 +Environment=PORT=18810 +Environment=RAG_COLLECTION=obsidian_bge_npu +Environment=RAG_EMBED_MODEL=bge-base-en-v1.5-int8-ov +Environment=OLLAMA_BASE_URL=http://127.0.0.1:18817 + +[Install] +WantedBy=default.target diff --git a/swarm-common/openvino-embeddings.service b/swarm-common/openvino-embeddings.service new file mode 100644 index 0000000..2b595f1 --- /dev/null +++ b/swarm-common/openvino-embeddings.service @@ -0,0 +1,19 @@ +[Unit] +Description=OpenVINO NPU Embeddings HTTP Service (port 18817) +After=network.target + +[Service] +Type=simple +WorkingDirectory=/home/will/lab/swarm +ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/scripts/openvino-embeddings-server.py +Restart=on-failure +RestartSec=5 +Environment=OPENVINO_EMBED_PORT=18817 +Environment=OPENVINO_EMBED_HOST=0.0.0.0 +Environment=OPENVINO_EMBED_DEVICE=NPU +Environment=OPENVINO_EMBED_MODEL=bge-base-en-v1.5-int8-ov +Environment=OPENVINO_EMBED_MODEL_DIR=/home/will/.cache/openvino-models/bge-base-en-v1.5-int8-ov +Environment=OPENVINO_EMBED_MAX_LENGTH=512 + +[Install] +WantedBy=default.target diff --git a/swarm-common/rag-embedding-health.service b/swarm-common/rag-embedding-health.service new file mode 100644 index 0000000..6bf92bc --- /dev/null +++ b/swarm-common/rag-embedding-health.service @@ -0,0 +1,16 @@ +[Unit] +Description=RAG/Embedding Health HTTP Service (port 18814) +After=network.target + +[Service] +Type=simple +ExecStart=/usr/bin/python3 /home/will/lab/swarm/scripts/rag-embedding-health-server.py +Restart=on-failure +RestartSec=5 +Environment=PORT=18814 +Environment=RAG_HEALTH_TIMEOUT=180 +Environment=OLLAMA_BASE_URL=http://127.0.0.1:18817 +Environment=RAG_EMBED_MODEL=bge-base-en-v1.5-int8-ov + +[Install] +WantedBy=default.target