Add semantic health to Obsidian reindex endpoint
This commit is contained in:
@@ -142,7 +142,9 @@ Local REST API:
|
|||||||
RAG/vector store:
|
RAG/vector store:
|
||||||
|
|
||||||
- ChromaDB path: `~/.hermes/data/rag-search/chroma/`
|
- ChromaDB path: `~/.hermes/data/rag-search/chroma/`
|
||||||
|
- Reindex state/progress: `~/.hermes/data/rag-search/obsidian_index_state.json` and `obsidian_reindex_progress.json`
|
||||||
- Embeddings backend: Ollama on `:18807`, normally `nomic-embed-text`
|
- Embeddings backend: Ollama on `:18807`, normally `nomic-embed-text`
|
||||||
|
- Reindex endpoint: `POST :18810/reindex` for incremental updates, `POST :18810/reindex?full=true` for full semantic rebuilds, `GET :18810/semantic-health` to verify vectors plus a search smoke test.
|
||||||
|
|
||||||
## Monitoring model
|
## Monitoring model
|
||||||
|
|
||||||
|
|||||||
@@ -1,15 +1,17 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""
|
"""
|
||||||
Obsidian Vault Reindex Endpoint
|
Obsidian Vault Reindex Endpoint
|
||||||
Lightweight HTTP server that triggers an incremental Obsidian vault reindex.
|
Lightweight HTTP server that triggers incremental or full Obsidian vault reindex.
|
||||||
|
|
||||||
Listens on 0.0.0.0:18810 (configurable via PORT env var).
|
Listens on 0.0.0.0:18810 (configurable via PORT env var).
|
||||||
Called by n8n webhooks or systemd timers.
|
Called by n8n webhooks or systemd timers.
|
||||||
|
|
||||||
Endpoints:
|
Endpoints:
|
||||||
POST /reindex -> trigger incremental reindex, returns JSON stats
|
POST /reindex -> trigger incremental reindex, returns JSON stats
|
||||||
GET /reindex/status -> check last index state
|
POST /reindex?full=true -> trigger full semantic Chroma rebuild
|
||||||
GET /healthz -> returns ok
|
GET /reindex/status -> check last index state
|
||||||
|
GET /semantic-health -> verify state plus semantic search smoke check
|
||||||
|
GET /healthz -> returns ok
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import http.server
|
import http.server
|
||||||
@@ -19,8 +21,10 @@ import subprocess
|
|||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from urllib.parse import parse_qs, urlparse
|
||||||
|
|
||||||
PORT = int(os.environ.get("PORT", 18810))
|
PORT = int(os.environ.get("PORT", 18810))
|
||||||
|
REINDEX_TIMEOUT = int(os.environ.get("REINDEX_TIMEOUT", "1800"))
|
||||||
|
|
||||||
REINDEX_SCRIPT = str(
|
REINDEX_SCRIPT = str(
|
||||||
Path.home()
|
Path.home()
|
||||||
@@ -29,38 +33,47 @@ REINDEX_SCRIPT = str(
|
|||||||
STATE_FILE = (
|
STATE_FILE = (
|
||||||
Path.home() / ".hermes/data/rag-search/obsidian_index_state.json"
|
Path.home() / ".hermes/data/rag-search/obsidian_index_state.json"
|
||||||
)
|
)
|
||||||
|
SEARCH_SCRIPT = str(Path.home() / ".hermes/skills/note-taking/rag-search/scripts/search.py")
|
||||||
|
VENV_PYTHON = str(Path.home() / ".hermes/skills/note-taking/rag-search/venv/bin/python")
|
||||||
|
|
||||||
# Lock to prevent concurrent reindexing
|
# Lock to prevent concurrent reindexing
|
||||||
_reindex_lock = threading.Lock()
|
_reindex_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
def run_reindex() -> dict:
|
def run_reindex(full: bool = False) -> dict:
|
||||||
"""Run the incremental reindex script. Returns stats dict."""
|
"""Run the reindex script. Returns stats dict."""
|
||||||
if not _reindex_lock.acquire(blocking=False):
|
if not _reindex_lock.acquire(blocking=False):
|
||||||
return {"error": "reindex already in progress", "status": "locked"}
|
return {"error": "reindex already in progress", "status": "locked"}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
cmd = [REINDEX_SCRIPT]
|
||||||
|
if full:
|
||||||
|
cmd.append("--full")
|
||||||
result = subprocess.run(
|
result = subprocess.run(
|
||||||
[REINDEX_SCRIPT],
|
cmd,
|
||||||
capture_output=True,
|
capture_output=True,
|
||||||
text=True,
|
text=True,
|
||||||
timeout=600, # 10 min max for full reindex
|
timeout=REINDEX_TIMEOUT,
|
||||||
)
|
)
|
||||||
if result.returncode != 0:
|
if result.returncode != 0:
|
||||||
return {
|
return {
|
||||||
"error": "reindex failed",
|
"error": "reindex failed",
|
||||||
"exit_code": result.returncode,
|
"exit_code": result.returncode,
|
||||||
"stderr": result.stderr.strip()[:500],
|
"stderr": result.stderr.strip()[-2000:],
|
||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
return json.loads(result.stdout)
|
payload = json.loads(result.stdout)
|
||||||
|
if result.stderr.strip():
|
||||||
|
payload["progress_log_tail"] = result.stderr.strip()[-2000:]
|
||||||
|
return payload
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
return {
|
return {
|
||||||
"error": "invalid json output",
|
"error": "invalid json output",
|
||||||
"stdout": result.stdout.strip()[:500],
|
"stdout": result.stdout.strip()[:500],
|
||||||
|
"stderr": result.stderr.strip()[-2000:],
|
||||||
}
|
}
|
||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
return {"error": "reindex timed out (600s)"}
|
return {"error": f"reindex timed out ({REINDEX_TIMEOUT}s)"}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
finally:
|
finally:
|
||||||
@@ -77,21 +90,77 @@ def get_status() -> dict:
|
|||||||
return {"error": str(e)}
|
return {"error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def semantic_health() -> dict:
|
||||||
|
"""Return state plus a tiny semantic-search smoke check."""
|
||||||
|
status = get_status()
|
||||||
|
health = {
|
||||||
|
"status": "ok" if status.get("status") == "ok" and status.get("vector_count", 0) > 0 else "degraded",
|
||||||
|
"state": {
|
||||||
|
k: status.get(k)
|
||||||
|
for k in (
|
||||||
|
"status",
|
||||||
|
"note_count",
|
||||||
|
"vector_count",
|
||||||
|
"collection",
|
||||||
|
"chroma_path",
|
||||||
|
"last_full_index",
|
||||||
|
"last_incremental_index",
|
||||||
|
)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
[
|
||||||
|
VENV_PYTHON if Path(VENV_PYTHON).exists() else sys.executable,
|
||||||
|
SEARCH_SCRIPT,
|
||||||
|
"--index",
|
||||||
|
"obsidian",
|
||||||
|
"--top-k",
|
||||||
|
"1",
|
||||||
|
"--raw",
|
||||||
|
"Obsidian reindex",
|
||||||
|
],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=90,
|
||||||
|
)
|
||||||
|
if result.returncode == 0:
|
||||||
|
payload = json.loads(result.stdout)
|
||||||
|
health["search_ok"] = bool(payload.get("results"))
|
||||||
|
health["result_count"] = len(payload.get("results", []))
|
||||||
|
else:
|
||||||
|
health["status"] = "degraded"
|
||||||
|
health["search_ok"] = False
|
||||||
|
health["search_error"] = result.stderr.strip()[-1000:] or result.stdout.strip()[-1000:]
|
||||||
|
except Exception as e:
|
||||||
|
health["status"] = "degraded"
|
||||||
|
health["search_ok"] = False
|
||||||
|
health["search_error"] = str(e)
|
||||||
|
if not health.get("search_ok"):
|
||||||
|
health["status"] = "degraded"
|
||||||
|
return health
|
||||||
|
|
||||||
|
|
||||||
class ReindexHandler(http.server.BaseHTTPRequestHandler):
|
class ReindexHandler(http.server.BaseHTTPRequestHandler):
|
||||||
def do_GET(self):
|
def do_GET(self):
|
||||||
path = self.path.rstrip("/")
|
path = urlparse(self.path).path.rstrip("/")
|
||||||
if path == "/healthz":
|
if path == "/healthz":
|
||||||
self._json_response({"status": "ok"})
|
self._json_response({"status": "ok"})
|
||||||
elif path == "/reindex/status":
|
elif path == "/reindex/status":
|
||||||
self._json_response(get_status())
|
self._json_response(get_status())
|
||||||
|
elif path in ("/semantic-health", "/reindex/semantic-health"):
|
||||||
|
data = semantic_health()
|
||||||
|
self._json_response(data, status=200 if data.get("status") == "ok" else 503)
|
||||||
else:
|
else:
|
||||||
self._json_response({"error": "not found"}, status=404)
|
self._json_response({"error": "not found"}, status=404)
|
||||||
|
|
||||||
def do_POST(self):
|
def do_POST(self):
|
||||||
path = self.path.rstrip("/")
|
parsed = urlparse(self.path)
|
||||||
|
path = parsed.path.rstrip("/")
|
||||||
if path == "/reindex":
|
if path == "/reindex":
|
||||||
# Run in background thread so we can respond
|
params = parse_qs(parsed.query)
|
||||||
result = run_reindex()
|
full = (params.get("full") or [""])[0].lower() in {"1", "true", "yes"}
|
||||||
|
result = run_reindex(full=full)
|
||||||
status = 200 if "error" not in result else 500
|
status = 200 if "error" not in result else 500
|
||||||
self._json_response(result, status=status)
|
self._json_response(result, status=status)
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user