From 71f3c05587375a555da92e32a85239d1507806db Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 14:50:41 -0700 Subject: [PATCH] feat(rag): add optional NPU reranker fallback --- docs/swarm-infrastructure.html | 6 +- docs/swarm-infrastructure.md | 17 ++- scripts/obsidian-reindex-server.py | 143 +++++++++++++++++- .../obsidian-reindex-endpoint.service | 8 + tests/test_obsidian_reindex_server.py | 138 +++++++++++++++++ 5 files changed, 303 insertions(+), 9 deletions(-) create mode 100644 tests/test_obsidian_reindex_server.py diff --git a/docs/swarm-infrastructure.html b/docs/swarm-infrastructure.html index edc6862..0158a86 100644 --- a/docs/swarm-infrastructure.html +++ b/docs/swarm-infrastructure.html @@ -87,7 +87,7 @@ VoiceKokoro + Whisper:18805 / :18816 Docker servicesagentmon.monitor=trueswarm/service snapshots OpenClaw VMscurrently dormantopenclaw.snapshot - Obsidian / RAGRAG endpoint :18810Chroma obsidian_bge_npu + Obsidian / RAG:18810 semantic searchNPU embed; optional rerank NPU sidecarsapproved prototypes; not live:18818/:18819/:18820/:18829 @@ -106,10 +106,10 @@

Monitoring model

  • • n8n direct probes critical ports
  • • agentmon aggregates Docker/OpenClaw snapshots
  • • n8n polls agentmon for stale/degraded state
-

Operational endpoints

  • • n8n: 127.0.0.1:18808
  • • agentmon query/UI: 8081 / 8082
  • • live NPU: RAG 18810, Whisper 18816, embeddings 18817
  • • prototypes not live-routed: 18818/18819/18820/18829
+

Operational endpoints

  • • n8n: 127.0.0.1:18808
  • • agentmon query/UI: 8081 / 8082
  • • live NPU: RAG 18810, Whisper 18816, embeddings 18817
  • • optional disabled rerank hook: 18818
  • • prototypes not live-routed: 18819/18820/18829

Source paths

  • • Swarm repo: ~/lab/swarm
  • • Agentmon repo: ~/lab/agentmon
  • • Workflows: swarm-common/n8n-workflows
- + diff --git a/docs/swarm-infrastructure.md b/docs/swarm-infrastructure.md index dd47587..06fc0fd 100644 --- a/docs/swarm-infrastructure.md +++ b/docs/swarm-infrastructure.md @@ -126,7 +126,7 @@ Host/user services: - `ollama.service` — `:18807`, legacy/CPU embeddings API fallback - `openvino-embeddings.service` — `:18817`, OpenVINO NPU embeddings API (`/v1/embeddings`, `/api/embed`, `/api/embeddings`) - `docker-health-endpoint.service` — `:18809`, read-only container health for n8n -- `obsidian-reindex-endpoint.service` — `:18810`, Obsidian/RAG reindex trigger; default collection `obsidian_bge_npu` using OpenVINO NPU embeddings +- `obsidian-reindex-endpoint.service` — `:18810`, Obsidian/RAG reindex trigger and `/semantic-search`; default collection `obsidian_bge_npu` using OpenVINO NPU embeddings, with optional request-time `:18818` reranking disabled by default - `url-content-extractor.service` — `:18812`, YouTube/PDF/web extraction - `voice-memo-processor.service` — `:18813`, voice memo processing - `rag-embedding-health.service` — `:18814`, RAG/embedding health wrapper @@ -159,7 +159,8 @@ RAG/vector store: - Reindex state/progress: active BGE/NPU state in `~/.hermes/data/rag-search/obsidian_bge_npu_index_state.json` and `obsidian_bge_npu_reindex_progress.json`; legacy Ollama state in `obsidian_index_state.json` remains for comparison/fallback. - Active RAG query/reindex embedding backend: OpenVINO NPU embeddings service on `:18817`, currently `bge-base-en-v1.5-int8-ov`, collection `obsidian_bge_npu`. - Legacy comparison/fallback collection: `obsidian`, built with Ollama on `:18807` using `nomic-embed-text`. -- Reindex endpoint: `POST :18810/reindex` for incremental updates, `POST :18810/reindex?full=true` for full semantic rebuilds, `GET :18810/semantic-health` to verify vectors plus a search smoke test. +- Reindex/search endpoint: `POST :18810/reindex` for incremental updates, `POST :18810/reindex?full=true` for full semantic rebuilds, `GET :18810/semantic-health` to verify vectors plus a search smoke test, and `POST :18810/semantic-search` for n8n/Hermes semantic context lookup. +- Optional reranker path: `RAG_RERANK_ENABLED=false` by default. When enabled, `/semantic-search` retrieves `RAG_RERANK_INITIAL_K` vector candidates, calls `RAG_RERANK_URL` (`http://127.0.0.1:18818/rerank` by default), returns reranked `RAG_RERANK_TOP_K`, requires positive `npu_busy_delta_us` by default (`RAG_RERANK_REQUIRE_NPU_PROOF=true`), and falls back to vector order with `rerank.error` metadata on timeout/error/non-positive NPU proof. Reranking is request-time only and must not mutate Chroma/vector collections. ## Monitoring model @@ -214,6 +215,11 @@ cd /home/will/lab/swarm make status make local-ai-health ./scripts/npu-service-health.sh # read-only; includes sysfs busy-time proof for :18817 +curl -fsS http://127.0.0.1:18810/semantic-health | jq '{status,state,search_ok,result_count}' +curl -fsS http://127.0.0.1:18810/semantic-search \ + -H 'Content-Type: application/json' \ + -d '{"query":"non-private semantic smoke","top_k":2}' \ + | jq '{ok,index,top_k,search_k,rerank,result_count}' curl -fsS http://127.0.0.1:18808/healthz curl -fsS http://127.0.0.1:8081/healthz curl -fsS 'http://127.0.0.1:8081/v1/events?event_type=swarm.snapshot&limit=1' | jq . @@ -223,8 +229,9 @@ From inside `n8n-agent`: ```bash docker exec n8n-agent /bin/sh -lc ' - wget -qO- -T 5 http://172.19.0.1:8081/healthz - wget -qO- -T 5 "http://172.19.0.1:8081/v1/events?event_type=swarm.snapshot&limit=1" | head -c 500 + wget -qO- -T 5 http://172.19.0.1:18810/healthz + wget -qO- -T 5 http://172.19.0.1:18814/healthz + wget -qO- -T 5 http://172.19.0.1:18817/healthz | head -c 500 ' ``` @@ -247,4 +254,4 @@ jq '.[0] | {id,name,active,nodes:(.nodes|length)}' /tmp/agentmon-export.json - From `n8n-agent`, use `127.0.0.1:5678` for n8n itself and `172.19.0.1:` for host-published swarm services. - Agentmon `/healthz` only proves the web/API process is alive; pair it with snapshot freshness to prove the monitoring pipeline is flowing. - OpenClaw is intentionally dormant unless explicitly re-enabled; do not alert on VMs being shut off by default. -- OpenVINO NPU sidecars on `:18818`, `:18819`, `:18820`, and optional `:18829` are prototypes/not-live unless a later approved change installs and routes them. Do not draw live Atlas/Hermes/RAG arrows to them in diagrams until that approval and implementation actually exist. +- OpenVINO NPU sidecars on `:18819`, `:18820`, and optional `:18829` are prototypes/not-live unless a later approved change installs and routes them. The `:18818` reranker is also a prototype service, but `:18810/semantic-search` now has a disabled-by-default request-time rerank hook that falls back safely when `:18818` is unavailable. Do not draw live Atlas/Hermes/classifier/GenAI arrows to prototypes until approval and implementation actually exist. diff --git a/scripts/obsidian-reindex-server.py b/scripts/obsidian-reindex-server.py index 6133b8c..1cbf422 100644 --- a/scripts/obsidian-reindex-server.py +++ b/scripts/obsidian-reindex-server.py @@ -21,14 +21,32 @@ import os import subprocess import sys import threading +import time from pathlib import Path from urllib.parse import parse_qs, urlparse +from urllib import request, error PORT = int(os.environ.get("PORT", 18810)) REINDEX_TIMEOUT = int(os.environ.get("REINDEX_TIMEOUT", "1800")) RAG_COLLECTION = os.environ.get("RAG_COLLECTION", "obsidian").strip() or "obsidian" RAG_EMBED_MODEL = os.environ.get("RAG_EMBED_MODEL", "nomic-embed-text").strip() or "nomic-embed-text" OLLAMA_BASE_URL = (os.environ.get("OLLAMA_BASE_URL") or "http://127.0.0.1:18807").rstrip("/") +RAG_RERANK_ENABLED = (os.environ.get("RAG_RERANK_ENABLED") or "false").strip().lower() in { + "1", + "true", + "yes", + "on", +} +RAG_RERANK_URL = (os.environ.get("RAG_RERANK_URL") or "http://127.0.0.1:18818/rerank").strip() +RAG_RERANK_INITIAL_K = max(1, int(os.environ.get("RAG_RERANK_INITIAL_K") or "20")) +RAG_RERANK_TOP_K = max(1, int(os.environ.get("RAG_RERANK_TOP_K") or "5")) +RAG_RERANK_TIMEOUT_MS = max(1, int(os.environ.get("RAG_RERANK_TIMEOUT_MS") or "3000")) +RAG_RERANK_REQUIRE_NPU_PROOF = (os.environ.get("RAG_RERANK_REQUIRE_NPU_PROOF") or "true").strip().lower() in { + "1", + "true", + "yes", + "on", +} REINDEX_SCRIPT = str( Path.home() @@ -102,12 +120,125 @@ def get_status() -> dict: return {"error": str(e)} +def _result_text(result: dict) -> str: + """Return the text field sent to the reranker without changing response shape.""" + return str(result.get("text") or result.get("content") or "") + + +def _apply_rerank(query: str, results: list[dict], final_k: int) -> tuple[list[dict], dict]: + """Optionally rerank semantic results, falling back to vector order on any error.""" + metadata = { + "enabled": RAG_RERANK_ENABLED, + "attempted": False, + "ok": False, + "url": RAG_RERANK_URL, + "initial_k": len(results), + "top_k": final_k, + } + if not RAG_RERANK_ENABLED: + metadata["ok"] = True + metadata["reason"] = "disabled" + return results[:final_k], metadata + if not results: + metadata["ok"] = True + metadata["reason"] = "no_results" + return [], metadata + + metadata["attempted"] = True + documents = [] + for idx, item in enumerate(results): + text = _result_text(item) + if not text: + continue + documents.append( + { + "id": str(item.get("id") or idx), + "text": text, + "metadata": { + "index": idx, + "path": item.get("path"), + "source": item.get("source"), + "chunk": item.get("chunk"), + }, + } + ) + if not documents: + metadata["ok"] = True + metadata["reason"] = "no_text_documents" + return results[:final_k], metadata + + started = time.monotonic() + try: + body = json.dumps( + { + "query": query, + "documents": documents, + "top_k": final_k, + "return_documents": False, + } + ).encode("utf-8") + req = request.Request( + RAG_RERANK_URL, + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with request.urlopen(req, timeout=RAG_RERANK_TIMEOUT_MS / 1000.0) as resp: + payload = json.loads(resp.read().decode("utf-8")) + except (OSError, TimeoutError, json.JSONDecodeError, error.URLError, error.HTTPError) as exc: + metadata["duration_ms"] = round((time.monotonic() - started) * 1000, 2) + metadata["error"] = f"{type(exc).__name__}: {exc}" + return results[:final_k], metadata + + metadata["duration_ms"] = round((time.monotonic() - started) * 1000, 2) + metadata["ok"] = bool(payload.get("ok", True)) + metadata["model"] = payload.get("model") + metadata["device"] = payload.get("device") + metadata["npu_busy_delta_us"] = payload.get("npu_busy_delta_us") + metadata["require_npu_proof"] = RAG_RERANK_REQUIRE_NPU_PROOF + metadata["input_count"] = payload.get("input_count") + ranked = payload.get("results") or [] + if RAG_RERANK_REQUIRE_NPU_PROOF and int(payload.get("npu_busy_delta_us") or 0) <= 0: + metadata["ok"] = False + metadata["error"] = "reranker response lacked positive npu_busy_delta_us" + return results[:final_k], metadata + if not metadata["ok"] or not ranked: + metadata["error"] = payload.get("error") or "reranker returned no ranked results" + return results[:final_k], metadata + + by_id = {str(item.get("id") or idx): item for idx, item in enumerate(results)} + reranked = [] + for rank, ranked_item in enumerate(ranked): + source_item = None + if "id" in ranked_item: + source_item = by_id.get(str(ranked_item.get("id"))) + if source_item is None and isinstance(ranked_item.get("index"), int): + idx = ranked_item["index"] + if 0 <= idx < len(results): + source_item = results[idx] + if source_item is None: + continue + merged = dict(source_item) + merged["rerank_score"] = ranked_item.get("score") + merged["rerank_rank"] = rank + 1 + reranked.append(merged) + if len(reranked) >= final_k: + break + if not reranked: + metadata["ok"] = False + metadata["error"] = "reranker result IDs did not match search results" + return results[:final_k], metadata + return reranked, metadata + + def run_semantic_search(query: str, top_k: int = 5) -> dict: """Query the local Obsidian Chroma index via the rag-search script.""" query = (query or "").strip() if not query: return {"ok": False, "error": "query is required", "results": []} top_k = max(1, min(int(top_k or 5), 20)) + search_k = max(top_k, min(RAG_RERANK_INITIAL_K, 100)) if RAG_RERANK_ENABLED else top_k + final_k = min(top_k, RAG_RERANK_TOP_K) if RAG_RERANK_ENABLED else top_k env = os.environ.copy() env.setdefault("RAG_COLLECTION", RAG_COLLECTION) env.setdefault("RAG_EMBED_MODEL", RAG_EMBED_MODEL) @@ -119,7 +250,7 @@ def run_semantic_search(query: str, top_k: int = 5) -> dict: "--index", RAG_COLLECTION, "--top-k", - str(top_k), + str(search_k), "--raw", query, ], @@ -133,17 +264,27 @@ def run_semantic_search(query: str, top_k: int = 5) -> dict: "ok": False, "query": query, "top_k": top_k, + "search_k": search_k, "error": result.stderr.strip()[-2000:] or result.stdout.strip()[-2000:], "results": [], + "rerank": { + "enabled": RAG_RERANK_ENABLED, + "attempted": False, + "ok": False, + "error": "vector search failed before rerank", + }, } payload = json.loads(result.stdout) results = payload.get("results") or [] + results, rerank_meta = _apply_rerank(query, results, final_k) return { "ok": True, "query": query, "index": payload.get("index", RAG_COLLECTION), "top_k": top_k, + "search_k": search_k, "result_count": len(results), + "rerank": rerank_meta, "results": results, } diff --git a/swarm-common/obsidian-reindex-endpoint.service b/swarm-common/obsidian-reindex-endpoint.service index 4928f23..b74ef17 100644 --- a/swarm-common/obsidian-reindex-endpoint.service +++ b/swarm-common/obsidian-reindex-endpoint.service @@ -11,6 +11,14 @@ Environment=PORT=18810 Environment=RAG_COLLECTION=obsidian_bge_npu Environment=RAG_EMBED_MODEL=bge-base-en-v1.5-int8-ov Environment=OLLAMA_BASE_URL=http://127.0.0.1:18817 +# Optional request-time second-stage reranking. Disabled by default so :18810 +# keeps working when the :18818 prototype is stopped or not yet approved live. +Environment=RAG_RERANK_ENABLED=false +Environment=RAG_RERANK_URL=http://127.0.0.1:18818/rerank +Environment=RAG_RERANK_INITIAL_K=20 +Environment=RAG_RERANK_TOP_K=5 +Environment=RAG_RERANK_TIMEOUT_MS=3000 +Environment=RAG_RERANK_REQUIRE_NPU_PROOF=true [Install] WantedBy=default.target diff --git a/tests/test_obsidian_reindex_server.py b/tests/test_obsidian_reindex_server.py new file mode 100644 index 0000000..b1fb38a --- /dev/null +++ b/tests/test_obsidian_reindex_server.py @@ -0,0 +1,138 @@ +import importlib.util +import json +import subprocess +import sys +import types +import unittest +from pathlib import Path +from typing import cast +from unittest import mock + +MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "obsidian-reindex-server.py" + + +def load_module(): + spec = importlib.util.spec_from_file_location("obsidian_reindex_server", MODULE_PATH) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return cast(types.ModuleType, module) + + +class SemanticSearchRerankTests(unittest.TestCase): + def setUp(self): + self.server = load_module() + self.results = [ + {"id": "a", "text": "alpha doc", "path": "a.md", "score": 0.1}, + {"id": "b", "text": "beta doc", "path": "b.md", "score": 0.2}, + {"id": "c", "text": "gamma doc", "path": "c.md", "score": 0.3}, + ] + + def _mock_search_run(self, expected_top_k=None): + def fake_run(cmd, capture_output, text, timeout, env): + if expected_top_k is not None: + self.assertEqual(cmd[cmd.index("--top-k") + 1], str(expected_top_k)) + return subprocess.CompletedProcess( + cmd, + 0, + stdout=json.dumps({"index": "obsidian_bge_npu", "results": self.results}), + stderr="", + ) + + return fake_run + + def test_disabled_rerank_preserves_vector_order(self): + setattr(self.server, "RAG_RERANK_ENABLED", False) + with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=2)): + payload = self.server.run_semantic_search("npu smoke", top_k=2) + self.assertTrue(payload["ok"]) + self.assertEqual(payload["search_k"], 2) + self.assertEqual([item["id"] for item in payload["results"]], ["a", "b"]) + self.assertEqual(payload["rerank"]["reason"], "disabled") + self.assertFalse(payload["rerank"]["attempted"]) + + def test_enabled_rerank_reorders_matching_results(self): + setattr(self.server, "RAG_RERANK_ENABLED", True) + setattr(self.server, "RAG_RERANK_INITIAL_K", 3) + setattr(self.server, "RAG_RERANK_TOP_K", 2) + + class FakeResponse: + def __enter__(self): + return self + + def __exit__(self, *args): + return False + + def read(self): + return json.dumps( + { + "ok": True, + "model": "synthetic-reranker", + "device": "NPU", + "npu_busy_delta_us": 123, + "results": [ + {"id": "c", "score": 9.0}, + {"id": "a", "score": 7.0}, + ], + } + ).encode() + + with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=3)), mock.patch.object( + self.server.request, "urlopen", return_value=FakeResponse() + ): + payload = self.server.run_semantic_search("npu smoke", top_k=2) + self.assertEqual([item["id"] for item in payload["results"]], ["c", "a"]) + self.assertTrue(payload["rerank"]["attempted"]) + self.assertTrue(payload["rerank"]["ok"]) + self.assertEqual(payload["rerank"]["npu_busy_delta_us"], 123) + self.assertEqual(payload["results"][0]["rerank_rank"], 1) + + def test_enabled_rerank_error_falls_back_to_vector_order(self): + setattr(self.server, "RAG_RERANK_ENABLED", True) + setattr(self.server, "RAG_RERANK_INITIAL_K", 3) + setattr(self.server, "RAG_RERANK_TOP_K", 2) + with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=3)), mock.patch.object( + self.server.request, "urlopen", side_effect=OSError("reranker unavailable") + ): + payload = self.server.run_semantic_search("npu smoke", top_k=2) + self.assertEqual([item["id"] for item in payload["results"]], ["a", "b"]) + self.assertTrue(payload["rerank"]["attempted"]) + self.assertFalse(payload["rerank"]["ok"]) + self.assertIn("reranker unavailable", payload["rerank"]["error"]) + + def test_enabled_rerank_requires_positive_npu_proof(self): + setattr(self.server, "RAG_RERANK_ENABLED", True) + setattr(self.server, "RAG_RERANK_INITIAL_K", 3) + setattr(self.server, "RAG_RERANK_TOP_K", 2) + setattr(self.server, "RAG_RERANK_REQUIRE_NPU_PROOF", True) + + class FakeResponse: + def __enter__(self): + return self + + def __exit__(self, *args): + return False + + def read(self): + return json.dumps( + { + "ok": True, + "device": "NPU", + "npu_busy_delta_us": 0, + "results": [{"id": "c", "score": 9.0}], + } + ).encode() + + with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=3)), mock.patch.object( + self.server.request, "urlopen", return_value=FakeResponse() + ): + payload = self.server.run_semantic_search("npu smoke", top_k=2) + self.assertEqual([item["id"] for item in payload["results"]], ["a", "b"]) + self.assertTrue(payload["rerank"]["attempted"]) + self.assertFalse(payload["rerank"]["ok"]) + self.assertIn("positive npu_busy_delta_us", payload["rerank"]["error"]) + + +if __name__ == "__main__": + unittest.main()