feat(rag): add optional NPU reranker fallback

This commit is contained in:
William Valentin
2026-06-04 14:50:41 -07:00
parent 06f235d26b
commit 71f3c05587
5 changed files with 303 additions and 9 deletions
+3 -3
View File
@@ -87,7 +87,7 @@
<g><rect x="965" y="385" width="210" height="80" rx="9" fill="#0f172a"/><rect x="965" y="385" width="210" height="80" rx="9" fill="rgba(8,51,68,.4)" stroke="#22d3ee" stroke-width="1.6"/><text x="1070" y="415" text-anchor="middle" class="title">Voice</text><text x="1070" y="436" text-anchor="middle" class="tiny">Kokoro + Whisper</text><text x="1070" y="454" text-anchor="middle" class="port">:18805 / :18816</text></g>
<g><rect x="965" y="555" width="210" height="80" rx="9" fill="#0f172a"/><rect x="965" y="555" width="210" height="80" rx="9" fill="rgba(76,29,149,.4)" stroke="#a78bfa" stroke-width="1.6"/><text x="1070" y="585" text-anchor="middle" class="title">Docker services</text><text x="1070" y="606" text-anchor="middle" class="tiny">agentmon.monitor=true</text><text x="1070" y="624" text-anchor="middle" class="port">swarm/service snapshots</text></g>
<g><rect x="965" y="665" width="210" height="80" rx="9" fill="#0f172a"/><rect x="965" y="665" width="210" height="80" rx="9" fill="rgba(120,53,15,.3)" stroke="#fbbf24" stroke-width="1.6"/><text x="1070" y="695" text-anchor="middle" class="title">OpenClaw VMs</text><text x="1070" y="716" text-anchor="middle" class="tiny">currently dormant</text><text x="1070" y="734" text-anchor="middle" class="port">openclaw.snapshot</text></g>
<g><rect x="965" y="775" width="210" height="75" rx="9" fill="#0f172a"/><rect x="965" y="775" width="210" height="75" rx="9" fill="rgba(76,29,149,.4)" stroke="#a78bfa" stroke-width="1.6"/><text x="1070" y="802" text-anchor="middle" class="title">Obsidian / RAG</text><text x="1070" y="821" text-anchor="middle" class="tiny">RAG endpoint :18810</text><text x="1070" y="840" text-anchor="middle" class="port">Chroma obsidian_bge_npu</text></g>
<g><rect x="965" y="775" width="210" height="75" rx="9" fill="#0f172a"/><rect x="965" y="775" width="210" height="75" rx="9" fill="rgba(76,29,149,.4)" stroke="#a78bfa" stroke-width="1.6"/><text x="1070" y="802" text-anchor="middle" class="title">Obsidian / RAG</text><text x="1070" y="821" text-anchor="middle" class="tiny">:18810 semantic search</text><text x="1070" y="840" text-anchor="middle" class="port">NPU embed; optional rerank</text></g>
<g><rect x="965" y="870" width="210" height="80" rx="9" fill="#0f172a"/><rect x="965" y="870" width="210" height="80" rx="9" fill="rgba(244,63,94,.16)" stroke="#fb7185" stroke-width="1.6" stroke-dasharray="6,4"/><text x="1070" y="896" text-anchor="middle" class="title">NPU sidecars</text><text x="1070" y="917" text-anchor="middle" class="tiny">approved prototypes; not live</text><text x="1070" y="936" text-anchor="middle" class="port">:18818/:18819/:18820/:18829</text></g>
<!-- host local ai box -->
@@ -106,10 +106,10 @@
</div>
<div class="cards">
<div class="info"><h3>Monitoring model</h3><ul><li>• n8n direct probes critical ports</li><li>• agentmon aggregates Docker/OpenClaw snapshots</li><li>• n8n polls agentmon for stale/degraded state</li></ul></div>
<div class="info"><h3>Operational endpoints</h3><ul><li>• n8n: 127.0.0.1:18808</li><li>• agentmon query/UI: 8081 / 8082</li><li>• live NPU: RAG 18810, Whisper 18816, embeddings 18817</li><li>• prototypes not live-routed: 18818/18819/18820/18829</li></ul></div>
<div class="info"><h3>Operational endpoints</h3><ul><li>• n8n: 127.0.0.1:18808</li><li>• agentmon query/UI: 8081 / 8082</li><li>• live NPU: RAG 18810, Whisper 18816, embeddings 18817</li><li> optional disabled rerank hook: 18818</li><li> prototypes not live-routed: 18819/18820/18829</li></ul></div>
<div class="info"><h3>Source paths</h3><ul><li>• Swarm repo: ~/lab/swarm</li><li>• Agentmon repo: ~/lab/agentmon</li><li>• Workflows: swarm-common/n8n-workflows</li></ul></div>
</div>
<div class="footer">Generated as repo documentation. Open locally in a browser; no JavaScript, all SVG inline. Dashed red OpenVINO NPU sidecars are approved prototypes only and do not imply live Atlas/Hermes/RAG routing.</div>
<div class="footer">Generated as repo documentation. Open locally in a browser; no JavaScript, all SVG inline. Dashed red OpenVINO NPU sidecars are approved prototypes; only :18810 has a disabled-by-default request-time rerank hook to :18818, and no classifier/GenAI sidecar is live-routed.</div>
</div>
</body>
</html>
+12 -5
View File
@@ -126,7 +126,7 @@ Host/user services:
- `ollama.service``:18807`, legacy/CPU embeddings API fallback
- `openvino-embeddings.service``:18817`, OpenVINO NPU embeddings API (`/v1/embeddings`, `/api/embed`, `/api/embeddings`)
- `docker-health-endpoint.service``:18809`, read-only container health for n8n
- `obsidian-reindex-endpoint.service``:18810`, Obsidian/RAG reindex trigger; default collection `obsidian_bge_npu` using OpenVINO NPU embeddings
- `obsidian-reindex-endpoint.service``:18810`, Obsidian/RAG reindex trigger and `/semantic-search`; default collection `obsidian_bge_npu` using OpenVINO NPU embeddings, with optional request-time `:18818` reranking disabled by default
- `url-content-extractor.service``:18812`, YouTube/PDF/web extraction
- `voice-memo-processor.service``:18813`, voice memo processing
- `rag-embedding-health.service``:18814`, RAG/embedding health wrapper
@@ -159,7 +159,8 @@ RAG/vector store:
- Reindex state/progress: active BGE/NPU state in `~/.hermes/data/rag-search/obsidian_bge_npu_index_state.json` and `obsidian_bge_npu_reindex_progress.json`; legacy Ollama state in `obsidian_index_state.json` remains for comparison/fallback.
- Active RAG query/reindex embedding backend: OpenVINO NPU embeddings service on `:18817`, currently `bge-base-en-v1.5-int8-ov`, collection `obsidian_bge_npu`.
- Legacy comparison/fallback collection: `obsidian`, built with Ollama on `:18807` using `nomic-embed-text`.
- Reindex endpoint: `POST :18810/reindex` for incremental updates, `POST :18810/reindex?full=true` for full semantic rebuilds, `GET :18810/semantic-health` to verify vectors plus a search smoke test.
- Reindex/search endpoint: `POST :18810/reindex` for incremental updates, `POST :18810/reindex?full=true` for full semantic rebuilds, `GET :18810/semantic-health` to verify vectors plus a search smoke test, and `POST :18810/semantic-search` for n8n/Hermes semantic context lookup.
- Optional reranker path: `RAG_RERANK_ENABLED=false` by default. When enabled, `/semantic-search` retrieves `RAG_RERANK_INITIAL_K` vector candidates, calls `RAG_RERANK_URL` (`http://127.0.0.1:18818/rerank` by default), returns reranked `RAG_RERANK_TOP_K`, requires positive `npu_busy_delta_us` by default (`RAG_RERANK_REQUIRE_NPU_PROOF=true`), and falls back to vector order with `rerank.error` metadata on timeout/error/non-positive NPU proof. Reranking is request-time only and must not mutate Chroma/vector collections.
## Monitoring model
@@ -214,6 +215,11 @@ cd /home/will/lab/swarm
make status
make local-ai-health
./scripts/npu-service-health.sh # read-only; includes sysfs busy-time proof for :18817
curl -fsS http://127.0.0.1:18810/semantic-health | jq '{status,state,search_ok,result_count}'
curl -fsS http://127.0.0.1:18810/semantic-search \
-H 'Content-Type: application/json' \
-d '{"query":"non-private semantic smoke","top_k":2}' \
| jq '{ok,index,top_k,search_k,rerank,result_count}'
curl -fsS http://127.0.0.1:18808/healthz
curl -fsS http://127.0.0.1:8081/healthz
curl -fsS 'http://127.0.0.1:8081/v1/events?event_type=swarm.snapshot&limit=1' | jq .
@@ -223,8 +229,9 @@ From inside `n8n-agent`:
```bash
docker exec n8n-agent /bin/sh -lc '
wget -qO- -T 5 http://172.19.0.1:8081/healthz
wget -qO- -T 5 "http://172.19.0.1:8081/v1/events?event_type=swarm.snapshot&limit=1" | head -c 500
wget -qO- -T 5 http://172.19.0.1:18810/healthz
wget -qO- -T 5 http://172.19.0.1:18814/healthz
wget -qO- -T 5 http://172.19.0.1:18817/healthz | head -c 500
'
```
@@ -247,4 +254,4 @@ jq '.[0] | {id,name,active,nodes:(.nodes|length)}' /tmp/agentmon-export.json
- From `n8n-agent`, use `127.0.0.1:5678` for n8n itself and `172.19.0.1:<host-port>` for host-published swarm services.
- Agentmon `/healthz` only proves the web/API process is alive; pair it with snapshot freshness to prove the monitoring pipeline is flowing.
- OpenClaw is intentionally dormant unless explicitly re-enabled; do not alert on VMs being shut off by default.
- OpenVINO NPU sidecars on `:18818`, `:18819`, `:18820`, and optional `:18829` are prototypes/not-live unless a later approved change installs and routes them. Do not draw live Atlas/Hermes/RAG arrows to them in diagrams until that approval and implementation actually exist.
- OpenVINO NPU sidecars on `:18819`, `:18820`, and optional `:18829` are prototypes/not-live unless a later approved change installs and routes them. The `:18818` reranker is also a prototype service, but `:18810/semantic-search` now has a disabled-by-default request-time rerank hook that falls back safely when `:18818` is unavailable. Do not draw live Atlas/Hermes/classifier/GenAI arrows to prototypes until approval and implementation actually exist.
+142 -1
View File
@@ -21,14 +21,32 @@ import os
import subprocess
import sys
import threading
import time
from pathlib import Path
from urllib.parse import parse_qs, urlparse
from urllib import request, error
PORT = int(os.environ.get("PORT", 18810))
REINDEX_TIMEOUT = int(os.environ.get("REINDEX_TIMEOUT", "1800"))
RAG_COLLECTION = os.environ.get("RAG_COLLECTION", "obsidian").strip() or "obsidian"
RAG_EMBED_MODEL = os.environ.get("RAG_EMBED_MODEL", "nomic-embed-text").strip() or "nomic-embed-text"
OLLAMA_BASE_URL = (os.environ.get("OLLAMA_BASE_URL") or "http://127.0.0.1:18807").rstrip("/")
RAG_RERANK_ENABLED = (os.environ.get("RAG_RERANK_ENABLED") or "false").strip().lower() in {
"1",
"true",
"yes",
"on",
}
RAG_RERANK_URL = (os.environ.get("RAG_RERANK_URL") or "http://127.0.0.1:18818/rerank").strip()
RAG_RERANK_INITIAL_K = max(1, int(os.environ.get("RAG_RERANK_INITIAL_K") or "20"))
RAG_RERANK_TOP_K = max(1, int(os.environ.get("RAG_RERANK_TOP_K") or "5"))
RAG_RERANK_TIMEOUT_MS = max(1, int(os.environ.get("RAG_RERANK_TIMEOUT_MS") or "3000"))
RAG_RERANK_REQUIRE_NPU_PROOF = (os.environ.get("RAG_RERANK_REQUIRE_NPU_PROOF") or "true").strip().lower() in {
"1",
"true",
"yes",
"on",
}
REINDEX_SCRIPT = str(
Path.home()
@@ -102,12 +120,125 @@ def get_status() -> dict:
return {"error": str(e)}
def _result_text(result: dict) -> str:
"""Return the text field sent to the reranker without changing response shape."""
return str(result.get("text") or result.get("content") or "")
def _apply_rerank(query: str, results: list[dict], final_k: int) -> tuple[list[dict], dict]:
"""Optionally rerank semantic results, falling back to vector order on any error."""
metadata = {
"enabled": RAG_RERANK_ENABLED,
"attempted": False,
"ok": False,
"url": RAG_RERANK_URL,
"initial_k": len(results),
"top_k": final_k,
}
if not RAG_RERANK_ENABLED:
metadata["ok"] = True
metadata["reason"] = "disabled"
return results[:final_k], metadata
if not results:
metadata["ok"] = True
metadata["reason"] = "no_results"
return [], metadata
metadata["attempted"] = True
documents = []
for idx, item in enumerate(results):
text = _result_text(item)
if not text:
continue
documents.append(
{
"id": str(item.get("id") or idx),
"text": text,
"metadata": {
"index": idx,
"path": item.get("path"),
"source": item.get("source"),
"chunk": item.get("chunk"),
},
}
)
if not documents:
metadata["ok"] = True
metadata["reason"] = "no_text_documents"
return results[:final_k], metadata
started = time.monotonic()
try:
body = json.dumps(
{
"query": query,
"documents": documents,
"top_k": final_k,
"return_documents": False,
}
).encode("utf-8")
req = request.Request(
RAG_RERANK_URL,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with request.urlopen(req, timeout=RAG_RERANK_TIMEOUT_MS / 1000.0) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except (OSError, TimeoutError, json.JSONDecodeError, error.URLError, error.HTTPError) as exc:
metadata["duration_ms"] = round((time.monotonic() - started) * 1000, 2)
metadata["error"] = f"{type(exc).__name__}: {exc}"
return results[:final_k], metadata
metadata["duration_ms"] = round((time.monotonic() - started) * 1000, 2)
metadata["ok"] = bool(payload.get("ok", True))
metadata["model"] = payload.get("model")
metadata["device"] = payload.get("device")
metadata["npu_busy_delta_us"] = payload.get("npu_busy_delta_us")
metadata["require_npu_proof"] = RAG_RERANK_REQUIRE_NPU_PROOF
metadata["input_count"] = payload.get("input_count")
ranked = payload.get("results") or []
if RAG_RERANK_REQUIRE_NPU_PROOF and int(payload.get("npu_busy_delta_us") or 0) <= 0:
metadata["ok"] = False
metadata["error"] = "reranker response lacked positive npu_busy_delta_us"
return results[:final_k], metadata
if not metadata["ok"] or not ranked:
metadata["error"] = payload.get("error") or "reranker returned no ranked results"
return results[:final_k], metadata
by_id = {str(item.get("id") or idx): item for idx, item in enumerate(results)}
reranked = []
for rank, ranked_item in enumerate(ranked):
source_item = None
if "id" in ranked_item:
source_item = by_id.get(str(ranked_item.get("id")))
if source_item is None and isinstance(ranked_item.get("index"), int):
idx = ranked_item["index"]
if 0 <= idx < len(results):
source_item = results[idx]
if source_item is None:
continue
merged = dict(source_item)
merged["rerank_score"] = ranked_item.get("score")
merged["rerank_rank"] = rank + 1
reranked.append(merged)
if len(reranked) >= final_k:
break
if not reranked:
metadata["ok"] = False
metadata["error"] = "reranker result IDs did not match search results"
return results[:final_k], metadata
return reranked, metadata
def run_semantic_search(query: str, top_k: int = 5) -> dict:
"""Query the local Obsidian Chroma index via the rag-search script."""
query = (query or "").strip()
if not query:
return {"ok": False, "error": "query is required", "results": []}
top_k = max(1, min(int(top_k or 5), 20))
search_k = max(top_k, min(RAG_RERANK_INITIAL_K, 100)) if RAG_RERANK_ENABLED else top_k
final_k = min(top_k, RAG_RERANK_TOP_K) if RAG_RERANK_ENABLED else top_k
env = os.environ.copy()
env.setdefault("RAG_COLLECTION", RAG_COLLECTION)
env.setdefault("RAG_EMBED_MODEL", RAG_EMBED_MODEL)
@@ -119,7 +250,7 @@ def run_semantic_search(query: str, top_k: int = 5) -> dict:
"--index",
RAG_COLLECTION,
"--top-k",
str(top_k),
str(search_k),
"--raw",
query,
],
@@ -133,17 +264,27 @@ def run_semantic_search(query: str, top_k: int = 5) -> dict:
"ok": False,
"query": query,
"top_k": top_k,
"search_k": search_k,
"error": result.stderr.strip()[-2000:] or result.stdout.strip()[-2000:],
"results": [],
"rerank": {
"enabled": RAG_RERANK_ENABLED,
"attempted": False,
"ok": False,
"error": "vector search failed before rerank",
},
}
payload = json.loads(result.stdout)
results = payload.get("results") or []
results, rerank_meta = _apply_rerank(query, results, final_k)
return {
"ok": True,
"query": query,
"index": payload.get("index", RAG_COLLECTION),
"top_k": top_k,
"search_k": search_k,
"result_count": len(results),
"rerank": rerank_meta,
"results": results,
}
@@ -11,6 +11,14 @@ Environment=PORT=18810
Environment=RAG_COLLECTION=obsidian_bge_npu
Environment=RAG_EMBED_MODEL=bge-base-en-v1.5-int8-ov
Environment=OLLAMA_BASE_URL=http://127.0.0.1:18817
# Optional request-time second-stage reranking. Disabled by default so :18810
# keeps working when the :18818 prototype is stopped or not yet approved live.
Environment=RAG_RERANK_ENABLED=false
Environment=RAG_RERANK_URL=http://127.0.0.1:18818/rerank
Environment=RAG_RERANK_INITIAL_K=20
Environment=RAG_RERANK_TOP_K=5
Environment=RAG_RERANK_TIMEOUT_MS=3000
Environment=RAG_RERANK_REQUIRE_NPU_PROOF=true
[Install]
WantedBy=default.target
+138
View File
@@ -0,0 +1,138 @@
import importlib.util
import json
import subprocess
import sys
import types
import unittest
from pathlib import Path
from typing import cast
from unittest import mock
MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "obsidian-reindex-server.py"
def load_module():
spec = importlib.util.spec_from_file_location("obsidian_reindex_server", MODULE_PATH)
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return cast(types.ModuleType, module)
class SemanticSearchRerankTests(unittest.TestCase):
def setUp(self):
self.server = load_module()
self.results = [
{"id": "a", "text": "alpha doc", "path": "a.md", "score": 0.1},
{"id": "b", "text": "beta doc", "path": "b.md", "score": 0.2},
{"id": "c", "text": "gamma doc", "path": "c.md", "score": 0.3},
]
def _mock_search_run(self, expected_top_k=None):
def fake_run(cmd, capture_output, text, timeout, env):
if expected_top_k is not None:
self.assertEqual(cmd[cmd.index("--top-k") + 1], str(expected_top_k))
return subprocess.CompletedProcess(
cmd,
0,
stdout=json.dumps({"index": "obsidian_bge_npu", "results": self.results}),
stderr="",
)
return fake_run
def test_disabled_rerank_preserves_vector_order(self):
setattr(self.server, "RAG_RERANK_ENABLED", False)
with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=2)):
payload = self.server.run_semantic_search("npu smoke", top_k=2)
self.assertTrue(payload["ok"])
self.assertEqual(payload["search_k"], 2)
self.assertEqual([item["id"] for item in payload["results"]], ["a", "b"])
self.assertEqual(payload["rerank"]["reason"], "disabled")
self.assertFalse(payload["rerank"]["attempted"])
def test_enabled_rerank_reorders_matching_results(self):
setattr(self.server, "RAG_RERANK_ENABLED", True)
setattr(self.server, "RAG_RERANK_INITIAL_K", 3)
setattr(self.server, "RAG_RERANK_TOP_K", 2)
class FakeResponse:
def __enter__(self):
return self
def __exit__(self, *args):
return False
def read(self):
return json.dumps(
{
"ok": True,
"model": "synthetic-reranker",
"device": "NPU",
"npu_busy_delta_us": 123,
"results": [
{"id": "c", "score": 9.0},
{"id": "a", "score": 7.0},
],
}
).encode()
with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=3)), mock.patch.object(
self.server.request, "urlopen", return_value=FakeResponse()
):
payload = self.server.run_semantic_search("npu smoke", top_k=2)
self.assertEqual([item["id"] for item in payload["results"]], ["c", "a"])
self.assertTrue(payload["rerank"]["attempted"])
self.assertTrue(payload["rerank"]["ok"])
self.assertEqual(payload["rerank"]["npu_busy_delta_us"], 123)
self.assertEqual(payload["results"][0]["rerank_rank"], 1)
def test_enabled_rerank_error_falls_back_to_vector_order(self):
setattr(self.server, "RAG_RERANK_ENABLED", True)
setattr(self.server, "RAG_RERANK_INITIAL_K", 3)
setattr(self.server, "RAG_RERANK_TOP_K", 2)
with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=3)), mock.patch.object(
self.server.request, "urlopen", side_effect=OSError("reranker unavailable")
):
payload = self.server.run_semantic_search("npu smoke", top_k=2)
self.assertEqual([item["id"] for item in payload["results"]], ["a", "b"])
self.assertTrue(payload["rerank"]["attempted"])
self.assertFalse(payload["rerank"]["ok"])
self.assertIn("reranker unavailable", payload["rerank"]["error"])
def test_enabled_rerank_requires_positive_npu_proof(self):
setattr(self.server, "RAG_RERANK_ENABLED", True)
setattr(self.server, "RAG_RERANK_INITIAL_K", 3)
setattr(self.server, "RAG_RERANK_TOP_K", 2)
setattr(self.server, "RAG_RERANK_REQUIRE_NPU_PROOF", True)
class FakeResponse:
def __enter__(self):
return self
def __exit__(self, *args):
return False
def read(self):
return json.dumps(
{
"ok": True,
"device": "NPU",
"npu_busy_delta_us": 0,
"results": [{"id": "c", "score": 9.0}],
}
).encode()
with mock.patch.object(self.server.subprocess, "run", self._mock_search_run(expected_top_k=3)), mock.patch.object(
self.server.request, "urlopen", return_value=FakeResponse()
):
payload = self.server.run_semantic_search("npu smoke", top_k=2)
self.assertEqual([item["id"] for item in payload["results"]], ["a", "b"])
self.assertTrue(payload["rerank"]["attempted"])
self.assertFalse(payload["rerank"]["ok"])
self.assertIn("positive npu_busy_delta_us", payload["rerank"]["error"])
if __name__ == "__main__":
unittest.main()