#!/usr/bin/env python3 """ Obsidian Vault Reindex Endpoint Lightweight HTTP server that triggers incremental or full Obsidian vault reindex. Listens on 0.0.0.0:18810 (configurable via PORT env var). Called by n8n webhooks or systemd timers. Endpoints: POST /reindex -> trigger incremental reindex, returns JSON stats POST /reindex?full=true -> trigger full semantic Chroma rebuild GET /reindex/status -> check last index state GET /semantic-health -> verify state plus semantic search smoke check POST /semantic-search -> query the Obsidian Chroma semantic index GET /healthz -> returns ok """ import http.server import json import os import subprocess import sys import threading from pathlib import Path from urllib.parse import parse_qs, urlparse PORT = int(os.environ.get("PORT", 18810)) REINDEX_TIMEOUT = int(os.environ.get("REINDEX_TIMEOUT", "1800")) REINDEX_SCRIPT = str( Path.home() / ".hermes/skills/note-taking/rag-search/scripts/reindex_obsidian.sh" ) STATE_FILE = ( Path.home() / ".hermes/data/rag-search/obsidian_index_state.json" ) SEARCH_SCRIPT = str(Path.home() / ".hermes/skills/note-taking/rag-search/scripts/search.py") VENV_PYTHON = str(Path.home() / ".hermes/skills/note-taking/rag-search/venv/bin/python") # Lock to prevent concurrent reindexing _reindex_lock = threading.Lock() def run_reindex(full: bool = False) -> dict: """Run the reindex script. Returns stats dict.""" if not _reindex_lock.acquire(blocking=False): return {"error": "reindex already in progress", "status": "locked"} try: cmd = [REINDEX_SCRIPT] if full: cmd.append("--full") result = subprocess.run( cmd, capture_output=True, text=True, timeout=REINDEX_TIMEOUT, ) if result.returncode != 0: return { "error": "reindex failed", "exit_code": result.returncode, "stderr": result.stderr.strip()[-2000:], } try: payload = json.loads(result.stdout) if result.stderr.strip(): payload["progress_log_tail"] = result.stderr.strip()[-2000:] return payload except json.JSONDecodeError: return { "error": "invalid json output", "stdout": result.stdout.strip()[:500], "stderr": result.stderr.strip()[-2000:], } except subprocess.TimeoutExpired: return {"error": f"reindex timed out ({REINDEX_TIMEOUT}s)"} except Exception as e: return {"error": str(e)} finally: _reindex_lock.release() def get_status() -> dict: """Read the last index state file.""" if not STATE_FILE.exists(): return {"indexed": False, "message": "no state file"} try: return json.loads(STATE_FILE.read_text()) except (json.JSONDecodeError, IOError) as e: return {"error": str(e)} def run_semantic_search(query: str, top_k: int = 5) -> dict: """Query the local Obsidian Chroma index via the rag-search script.""" query = (query or "").strip() if not query: return {"ok": False, "error": "query is required", "results": []} top_k = max(1, min(int(top_k or 5), 20)) result = subprocess.run( [ VENV_PYTHON if Path(VENV_PYTHON).exists() else sys.executable, SEARCH_SCRIPT, "--index", "obsidian", "--top-k", str(top_k), "--raw", query, ], capture_output=True, text=True, timeout=90, ) if result.returncode != 0: return { "ok": False, "query": query, "top_k": top_k, "error": result.stderr.strip()[-2000:] or result.stdout.strip()[-2000:], "results": [], } payload = json.loads(result.stdout) results = payload.get("results") or [] return { "ok": True, "query": query, "index": payload.get("index", "obsidian"), "top_k": top_k, "result_count": len(results), "results": results, } def semantic_health() -> dict: """Return state plus a tiny semantic-search smoke check.""" status = get_status() health = { "status": "ok" if status.get("status") == "ok" and status.get("vector_count", 0) > 0 else "degraded", "state": { k: status.get(k) for k in ( "status", "note_count", "vector_count", "collection", "chroma_path", "last_full_index", "last_incremental_index", ) }, } try: payload = run_semantic_search("Obsidian reindex", top_k=1) health["search_ok"] = bool(payload.get("results")) health["result_count"] = len(payload.get("results", [])) if not payload.get("ok"): health["search_error"] = payload.get("error") except Exception as e: health["status"] = "degraded" health["search_ok"] = False health["search_error"] = str(e) if not health.get("search_ok"): health["status"] = "degraded" return health class ReindexHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): path = urlparse(self.path).path.rstrip("/") if path == "/healthz": self._json_response({"status": "ok"}) elif path == "/reindex/status": self._json_response(get_status()) elif path in ("/semantic-health", "/reindex/semantic-health"): data = semantic_health() self._json_response(data, status=200 if data.get("status") == "ok" else 503) else: self._json_response({"error": "not found"}, status=404) def do_POST(self): parsed = urlparse(self.path) path = parsed.path.rstrip("/") if path == "/reindex": params = parse_qs(parsed.query) full = (params.get("full") or [""])[0].lower() in {"1", "true", "yes"} result = run_reindex(full=full) status = 200 if "error" not in result else 500 self._json_response(result, status=status) elif path == "/semantic-search": try: length = int(self.headers.get("Content-Length") or 0) body = self.rfile.read(length).decode("utf-8") if length else "{}" payload = json.loads(body or "{}") query = payload.get("query") or payload.get("q") or "" top_k = payload.get("top_k") or payload.get("topK") or 5 result = run_semantic_search(str(query), int(top_k)) self._json_response(result, status=200 if result.get("ok") else 400) except json.JSONDecodeError: self._json_response({"ok": False, "error": "invalid json", "results": []}, status=400) except Exception as exc: self._json_response({"ok": False, "error": str(exc), "results": []}, status=500) else: self._json_response({"error": "not found"}, status=404) def _json_response(self, data, status=200): body = json.dumps(data, indent=2).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, format, *args): # Minimal logging pass def main(): server = http.server.HTTPServer(("0.0.0.0", PORT), ReindexHandler) print(f"obsidian-reindex-server listening on 0.0.0.0:{PORT}", flush=True) try: server.serve_forever() except KeyboardInterrupt: pass server.server_close() if __name__ == "__main__": main()