3 Commits

Author SHA1 Message Date
William Valentin 5b01b1bd11 feat: add OpenVINO NPU prototype services 2026-06-04 11:41:55 -07:00
Atlas Ops d67c259187 docs: add OpenVINO NPU services runbook 2026-06-04 11:38:54 -07:00
Hermes Agent 4003198ba9 feat: add OpenVINO router classifier prototype 2026-06-04 11:38:54 -07:00
33 changed files with 3333 additions and 1 deletions
+122
View File
@@ -0,0 +1,122 @@
# OpenVINO NPU router classifier prototype
Dry-run Atlas/Hermes message classifier/router prototype.
It reuses the existing OpenVINO NPU embeddings service on `127.0.0.1:18817` and
serves an inspectable stdlib HTTP API on `127.0.0.1:18819`. It does not change
live Hermes/Atlas routing, write memory, mutate vector collections, restart
services, or send external messages.
## Runtime shape
- Service: `atlas-router-classifier`
- Default port: `18819`
- Default bind: `127.0.0.1`
- Upstream: `http://127.0.0.1:18817/v1/embeddings`
- Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0`
- NPU proof: `/sys/class/accel/accel0/device/npu_busy_time_us` before/after plus upstream `npu_busy_delta_us`
The classifier uses deterministic high-precision rules for safety/urgency/tool
signals plus cosine similarity against curated embedding prototypes for workflow
and memory recommendations. This is intentionally tunable without model training.
## API
### GET `/healthz`
Returns service metadata, labels, prototype count, NPU sysfs counter, and warmup
NPU delta.
### GET `/v1/labels`
Returns label enum values, thresholds, and prototype IDs without dumping private
fixtures.
### POST `/v1/classify`
Request:
```json
{
"id": "optional trace id",
"text": "User message or task body to classify.",
"context": {"platform": "cli", "source": "user"},
"options": {
"include_evidence": true,
"include_embedding_debug": false,
"dry_run": true
}
}
```
Response includes:
- `labels.tool_needed`: boolean, confidence, threshold, reason codes
- `labels.memory_candidate`: `none | user_preference | durable_user_fact | environment_fact | workflow_convention | skill_candidate`
- `labels.urgency`: `low | normal | high | critical`
- `labels.workflow_category`: `chat | research | coding | debugging | devops | smart_home | media | note_taking | productivity | kanban | unknown`
- `labels.safety_confirmation_required`: boolean, confidence, reason codes
- `npu_busy_delta_us` and `sysfs_npu_busy_delta_us`
- `evidence` when requested
### POST `/v1/batch_classify`
Request:
```json
{
"items": [{"id": "m1", "text": "What time is it?"}],
"options": {"include_evidence": false, "dry_run": true}
}
```
## Local smoke test
Check that the proposed port is free first:
```bash
ss -ltnp | grep ':18819' || true
```
Run without installing anything extra; `/home/will/.venvs/npu` already has the
stdlib plus requests/openvino stack used by the upstream embeddings service:
```bash
cd /home/will/lab/swarm/openvino-classifier-npu
/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819
```
Then from another shell:
```bash
curl -fsS http://127.0.0.1:18819/healthz | jq .
curl -fsS http://127.0.0.1:18819/v1/classify \
-H 'Content-Type: application/json' \
-d '{"id":"smoke","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true}}' | jq .
```
A valid NPU-backed response must have positive `npu_busy_delta_us`; HTTP 200 by
itself is not considered proof.
## Tests
Unit tests use a fake embedding client and do not touch the NPU:
```bash
/home/will/.venvs/npu/bin/python -m unittest discover -s openvino-classifier-npu/tests -v
```
Fixture messages live at `fixtures/atlas_hermes_messages.jsonl`.
## Optional systemd user unit
A draft unit is included as `openvino-router-classifier.service`. Install only
after review/approval:
```bash
cp openvino-router-classifier.service ~/.config/systemd/user/openvino-router-classifier.service
systemctl --user daemon-reload
systemctl --user enable --now openvino-router-classifier.service
```
Do not enable it as part of this prototype task without explicit approval.
@@ -0,0 +1,10 @@
{"id":"tool-time","text":"What time is it in Seattle right now?","expected":{"tool_needed":true,"workflow_category":"chat","urgency":"normal","safety_confirmation_required":false}}
{"id":"memory-preference","text":"Remember that I prefer concise answers in the terminal.","expected":{"memory_candidate":"user_preference","tool_needed":false,"safety_confirmation_required":false}}
{"id":"coding-debug","text":"Debug the failing pytest suite and inspect the git diff before opening a PR.","expected":{"tool_needed":true,"workflow_category":"debugging","urgency":"normal"}}
{"id":"devops-urgent","text":"Urgent: the embeddings service on port 18817 is down; check systemd logs and restore it.","expected":{"tool_needed":true,"workflow_category":"devops","urgency":"high"}}
{"id":"safety-routing","text":"Restart the live Atlas gateway and switch primary routing to the new classifier.","expected":{"tool_needed":true,"workflow_category":"devops","safety_confirmation_required":true}}
{"id":"destructive-reindex","text":"Delete the existing Chroma collection and reindex the Obsidian vault in place.","expected":{"tool_needed":true,"workflow_category":"note_taking","safety_confirmation_required":true}}
{"id":"research","text":"Research current OpenVINO NPU support for TinyBERT sequence classification and summarize sources.","expected":{"tool_needed":true,"workflow_category":"research"}}
{"id":"smart-home","text":"Turn off the living room lights and set the thermostat to 68.","expected":{"tool_needed":true,"workflow_category":"smart_home"}}
{"id":"media","text":"Transcribe this voice memo and extract action items.","expected":{"tool_needed":true,"workflow_category":"media"}}
{"id":"kanban","text":"Work kanban task t_5e123496 and block it if review is required.","expected":{"tool_needed":true,"workflow_category":"kanban"}}
@@ -0,0 +1,17 @@
[Unit]
Description=Atlas/Hermes dry-run OpenVINO router classifier
After=network.target openvino-embeddings.service
Wants=openvino-embeddings.service
[Service]
Type=simple
WorkingDirectory=/home/will/lab/swarm/openvino-classifier-npu
Environment=OPENVINO_CLASSIFIER_HOST=127.0.0.1
Environment=OPENVINO_CLASSIFIER_PORT=18819
Environment=OPENVINO_CLASSIFIER_EMBED_URL=http://127.0.0.1:18817/v1/embeddings
ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-classifier-npu/router_classifier.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=default.target
@@ -0,0 +1,532 @@
#!/usr/bin/env python3
"""Dry-run Atlas/Hermes router classifier backed by the local OpenVINO NPU embedding service.
Default port: 18819
Default upstream: http://127.0.0.1:18817/v1/embeddings
This service is intentionally advisory only. It does not write memory, mutate routing,
restart services, or call external APIs. NPU execution is proved by the upstream
embedding service's npu_busy_delta_us and by reading the local sysfs busy counter.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
VERSION = "0.1.0"
SERVICE = "atlas-router-classifier"
MODEL = "bge-base-en-v1.5-int8-ov/prototype-router-v0"
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 18819
DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings"
NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
WORKFLOW_CATEGORIES = [
"chat",
"research",
"coding",
"debugging",
"devops",
"smart_home",
"media",
"note_taking",
"productivity",
"kanban",
"unknown",
]
MEMORY_VALUES = ["none", "user_preference", "durable_user_fact", "environment_fact", "workflow_convention", "skill_candidate"]
URGENCY_VALUES = ["low", "normal", "high", "critical"]
PROTOTYPES: dict[str, list[str]] = {
"tool_needed": [
"check the current date time weather news versions or live facts",
"inspect files git branches logs ports processes disk memory or system state",
"send a message create a cron job call an API or interact with a local service",
"search the web browse a website download or verify current information",
],
"memory_user_preference": [
"remember that I prefer concise replies and a direct style",
"my preference is use short answers and avoid unnecessary detail",
"please remember I like this convention for future sessions",
],
"memory_durable_user_fact": [
"remember that I live in Seattle and work on local AI infrastructure",
"my name role location identity or durable personal detail is",
],
"memory_environment_fact": [
"this project uses pytest and this server runs linux with openvino npu",
"remember this repository convention service port path or environment setup",
],
"memory_workflow_convention": [
"for this workflow use this recurring procedure convention or process",
"the team convention is to run checks before code review and use a worktree",
],
"memory_skill_candidate": [
"we discovered a reusable multi step workflow that should become a skill",
"save this procedure as a reusable skill after solving a tricky task",
],
"urgency_low": [
"whenever convenient no rush low priority idea someday backlog",
],
"urgency_high": [
"urgent asap high priority today please handle soon production issue",
"service is degraded broken failing down users are blocked",
],
"urgency_critical": [
"critical outage security incident data loss production down emergency now",
"stop the bleeding rollback immediately credentials leaked destructive incident",
],
"workflow_chat": [
"answer a general question explain a concept brainstorm rewrite text chat casually",
],
"workflow_research": [
"research compare summarize sources papers market docs web search literature review",
],
"workflow_coding": [
"implement code write tests refactor add feature fix type errors create a branch",
],
"workflow_debugging": [
"debug failing tests inspect logs reproduce error traceback diagnose regression",
],
"workflow_devops": [
"operate services systemd docker kubernetes ports health checks deploy infrastructure",
],
"workflow_smart_home": [
"turn on lights adjust thermostat control tv speaker home assistant hue wiz",
],
"workflow_media": [
"transcribe audio process video image gif spotify music youtube media file",
],
"workflow_note_taking": [
"obsidian notes daily diary memory knowledge base document personal context",
],
"workflow_productivity": [
"calendar email spreadsheet presentation notion airtable linear task planning",
],
"workflow_kanban": [
"kanban task board card assignee handoff review required blocked complete worker",
],
}
RULES: dict[str, list[tuple[re.Pattern[str], str, float]]] = {
"tool_needed": [
(re.compile(r"\b(current|today|now|latest|weather|news|version|price|stock)\b", re.I), "current_fact_requested", 0.88),
(re.compile(r"\b(file|directory|git|branch|commit|diff|log|port|process|disk|memory|cpu|gpu|npu|service|systemd|reindex)\b", re.I), "local_state_requested", 0.84),
(re.compile(r"\b(send|schedule|create cron|call api|download|browse|search web|open website|turn on|turn off|set the thermostat|transcribe|restart|switch primary routing|work kanban|kanban task)\b", re.I), "external_or_tool_action_requested", 0.86),
],
"safety": [
(re.compile(r"\b(delete|remove|overwrite|drop|truncate|wipe|reindex|reset --hard|force push)\b", re.I), "destructive_or_irreversible_action", 0.92),
(re.compile(r"\b(restart|stop|deploy|expose|public|0\.0\.0\.0|route live|primary routing|gateway)\b", re.I), "live_service_or_routing_change", 0.88),
(re.compile(r"\b(secret|token|api key|credential|password|private document|external upload|send message|spend money|purchase)\b", re.I), "credential_privacy_or_external_side_effect", 0.9),
],
"memory": [
(re.compile(r"\b(remember that|please remember|don'?t forget|my preference|I prefer|call me)\b", re.I), "explicit_memory_language", 0.9),
(re.compile(r"\b(always|for future|going forward|convention|workflow|standard practice)\b", re.I), "durable_convention_language", 0.78),
],
"urgency_high": [
(re.compile(r"\b(urgent|asap|immediately|high priority|production|down|broken|blocked)\b", re.I), "urgent_language", 0.84),
],
"urgency_critical": [
(re.compile(r"\b(critical|emergency|outage|data loss|credential leak|security incident|prod down)\b", re.I), "critical_incident_language", 0.94),
],
}
def npu_busy_time_us() -> int | None:
try:
return int(NPU_BUSY_FILE.read_text().strip())
except Exception:
return None
def clamp01(value: float) -> float:
return max(0.0, min(1.0, value))
def cosine(a: list[float], b: list[float]) -> float:
if not a or not b or len(a) != len(b):
return 0.0
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(y * y for y in b))
if na == 0.0 or nb == 0.0:
return 0.0
# Map [-1, 1] to [0, 1] for confidence-like scoring.
return clamp01((dot / (na * nb) + 1.0) / 2.0)
def best_rule(text: str, group: str) -> tuple[float, list[str], list[dict[str, Any]]]:
best = 0.0
codes: list[str] = []
evidence: list[dict[str, Any]] = []
for pattern, code, score in RULES.get(group, []):
match = pattern.search(text)
if match:
best = max(best, score)
codes.append(code)
evidence.append({"label": group, "source": "rule", "matched": match.group(0), "reason_code": code, "score": score})
return best, sorted(set(codes)), evidence
@dataclass
class EmbedResult:
vectors: list[list[float]]
npu_busy_delta_us: int | None
duration_ms: float
embedding_dim: int | None
class EmbeddingClient:
def __init__(self, url: str, timeout_s: float = 30.0) -> None:
self.url = url
self.timeout_s = timeout_s
def embed(self, texts: list[str], *, purpose: str = "query") -> EmbedResult:
payload = json.dumps({"input": texts, "purpose": purpose}).encode("utf-8")
request = urllib.request.Request(
self.url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
started = time.perf_counter()
try:
with urllib.request.urlopen(request, timeout=self.timeout_s) as response: # noqa: S310 - local configured URL
body = response.read().decode("utf-8", "replace")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", "replace")
raise RuntimeError(f"embedding service HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"embedding service unavailable at {self.url}: {exc.reason}") from exc
data = json.loads(body)
vectors = [item["embedding"] for item in data.get("data", [])]
return EmbedResult(
vectors=[[float(x) for x in vec] for vec in vectors],
npu_busy_delta_us=data.get("npu_busy_delta_us"),
duration_ms=round((time.perf_counter() - started) * 1000, 3),
embedding_dim=data.get("embedding_dim") or (len(vectors[0]) if vectors else None),
)
class ClassifierService:
def __init__(self, embed_url: str, *, timeout_s: float = 30.0) -> None:
self.embed_url = embed_url
self.client = EmbeddingClient(embed_url, timeout_s=timeout_s)
self.loaded_at = time.time()
self.prototype_texts: list[str] = []
self.prototype_keys: list[str] = []
for key, examples in PROTOTYPES.items():
for example in examples:
self.prototype_keys.append(key)
self.prototype_texts.append(example)
self.prototype_vectors: list[list[float]] | None = None
self.prototype_npu_busy_delta_us: int | None = None
self.embedding_dim: int | None = None
self.warnings: list[str] = []
def warmup(self) -> None:
result = self.client.embed(self.prototype_texts, purpose="document")
self.prototype_vectors = result.vectors
self.prototype_npu_busy_delta_us = result.npu_busy_delta_us
self.embedding_dim = result.embedding_dim
if not result.npu_busy_delta_us or result.npu_busy_delta_us <= 0:
self.warnings.append("prototype embedding warmup did not report positive NPU busy delta")
def health(self) -> dict[str, Any]:
return {
"status": "ok" if self.prototype_vectors else "starting",
"service": SERVICE,
"version": VERSION,
"mode": "dry_run",
"model": MODEL,
"embed_url": self.embed_url,
"device": "NPU-via-embedding-service",
"labels": ["tool_needed", "memory_candidate", "urgency", "workflow_category", "safety_confirmation_required"],
"embedding_dim": self.embedding_dim,
"prototype_count": len(self.prototype_texts),
"prototype_npu_busy_delta_us": self.prototype_npu_busy_delta_us,
"npu_busy_time_us": npu_busy_time_us(),
"uptime_s": round(time.time() - self.loaded_at, 3),
"warnings": self.warnings,
}
def labels(self) -> dict[str, Any]:
return {
"model": MODEL,
"thresholds": {
"tool_needed": 0.72,
"memory_candidate": 0.78,
"safety_confirmation_required": 0.80,
"workflow_category": 0.52,
},
"enums": {"memory_candidate": MEMORY_VALUES, "urgency": URGENCY_VALUES, "workflow_category": WORKFLOW_CATEGORIES},
"prototype_ids": sorted(PROTOTYPES),
}
def classify(self, item_id: str | None, text: str, options: dict[str, Any] | None = None) -> dict[str, Any]:
if self.prototype_vectors is None:
self.warmup()
options = options or {}
include_evidence = bool(options.get("include_evidence", True))
include_embedding_debug = bool(options.get("include_embedding_debug", False))
dry_run = bool(options.get("dry_run", True))
started = time.perf_counter()
text = str(text or "")
if not text.strip():
raise ValueError("text must be a non-empty string")
sysfs_before = npu_busy_time_us()
embedded = self.client.embed([text], purpose="query")
sysfs_after = npu_busy_time_us()
if not embedded.vectors:
raise RuntimeError("embedding service returned no vectors")
message_vec = embedded.vectors[0]
similarities = self._prototype_scores(message_vec)
evidence: list[dict[str, Any]] = []
labels: dict[str, Any] = {}
tool_rule, tool_codes, tool_evidence = best_rule(text, "tool_needed")
tool_proto = max([similarities.get("tool_needed", 0.0)], default=0.0)
# Similarity alone is too broad for action classification; require either
# a deterministic rule hit or a very strong prototype match.
tool_conf = round(max(tool_rule, tool_proto if tool_proto >= 0.88 else 0.0), 3)
labels["tool_needed"] = {"value": tool_conf >= 0.72, "confidence": tool_conf, "threshold": 0.72, "reason_codes": tool_codes}
evidence.extend(tool_evidence)
if tool_proto > 0:
evidence.append({"label": "tool_needed", "source": "prototype_similarity", "prototype": "tool_needed", "score": round(tool_proto, 3)})
mem_label, mem_conf, mem_codes, mem_ev = self._memory_label(text, similarities)
labels["memory_candidate"] = {"value": mem_label, "confidence": round(mem_conf, 3), "threshold": 0.78, "reason_codes": mem_codes}
evidence.extend(mem_ev)
urgency_value, urgency_conf, urgency_scores, urgency_codes, urgency_ev = self._urgency_label(text, similarities)
labels["urgency"] = {"value": urgency_value, "confidence": round(urgency_conf, 3), "scores": {k: round(v, 3) for k, v in urgency_scores.items()}, "reason_codes": urgency_codes}
evidence.extend(urgency_ev)
workflow_value, workflow_conf, workflow_scores, workflow_ev = self._workflow_label(similarities, text)
labels["workflow_category"] = {"value": workflow_value, "confidence": round(workflow_conf, 3), "scores": {k: round(v, 3) for k, v in workflow_scores.items()}}
evidence.extend(workflow_ev)
safety_rule, safety_codes, safety_evidence = best_rule(text, "safety")
safety_proto = 0.0
safety_conf = round(max(safety_rule, safety_proto), 3)
labels["safety_confirmation_required"] = {"value": safety_conf >= 0.80, "confidence": safety_conf, "threshold": 0.80, "reason_codes": safety_codes}
evidence.extend(safety_evidence)
npu_delta = embedded.npu_busy_delta_us
sysfs_delta = None if sysfs_before is None or sysfs_after is None else sysfs_after - sysfs_before
warnings = list(self.warnings)
if not npu_delta or npu_delta <= 0:
warnings.append("embedding call did not report positive npu_busy_delta_us; NPU execution not proven for this request")
if sysfs_delta is not None and sysfs_delta <= 0:
warnings.append("sysfs npu_busy_time_us did not increase during classification request")
response: dict[str, Any] = {
"id": item_id,
"model": MODEL,
"created": int(time.time()),
"duration_ms": round((time.perf_counter() - started) * 1000, 3),
"npu_busy_delta_us": npu_delta,
"sysfs_npu_busy_delta_us": sysfs_delta,
"dry_run": dry_run,
"labels": labels,
"warnings": warnings,
}
if include_evidence:
response["evidence"] = evidence[:30]
if include_embedding_debug:
response["embedding_debug"] = {"embedding_dim": len(message_vec), "prototype_scores": {k: round(v, 3) for k, v in similarities.items()}}
return response
def batch_classify(self, items: list[dict[str, Any]], options: dict[str, Any] | None = None) -> dict[str, Any]:
started = time.perf_counter()
results = [self.classify(item.get("id"), str(item.get("text") or ""), options) for item in items]
return {
"model": MODEL,
"duration_ms": round((time.perf_counter() - started) * 1000, 3),
"npu_busy_delta_us": sum((r.get("npu_busy_delta_us") or 0) for r in results),
"results": results,
}
def _prototype_scores(self, vec: list[float]) -> dict[str, float]:
assert self.prototype_vectors is not None
scores: dict[str, float] = {}
for key, prototype_vec in zip(self.prototype_keys, self.prototype_vectors):
scores[key] = max(scores.get(key, 0.0), cosine(vec, prototype_vec))
return scores
def _memory_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, list[str], list[dict[str, Any]]]:
rule_score, codes, evidence = best_rule(text, "memory")
candidates = {
"user_preference": scores.get("memory_user_preference", 0.0),
"durable_user_fact": scores.get("memory_durable_user_fact", 0.0),
"environment_fact": scores.get("memory_environment_fact", 0.0),
"workflow_convention": scores.get("memory_workflow_convention", 0.0),
"skill_candidate": scores.get("memory_skill_candidate", 0.0),
}
label, proto_score = max(candidates.items(), key=lambda kv: kv[1])
confidence = max(proto_score, rule_score)
explicit_memory = rule_score >= 0.78
durable_fact_hint = bool(re.search(r"\b(project uses|repo uses|environment uses|runs on|standard practice|convention|workflow convention)\b", text, re.I))
if explicit_memory:
if re.search(r"\b(prefer|preference|call me|my name|I live|I am)\b", text, re.I):
label = "user_preference" if re.search(r"\b(prefer|preference)\b", text, re.I) else "durable_user_fact"
elif durable_fact_hint:
label = "environment_fact"
elif re.search(r"\b(skill|procedure|workflow)\b", text, re.I):
label = "skill_candidate"
# BGE prototype similarities are advisory but broad; avoid recommending
# memory writes from similarity alone unless the text also has durable-
# fact language or an unusually strong prototype match.
if confidence < 0.78 or (not explicit_memory and not durable_fact_hint and proto_score < 0.88):
label = "none"
else:
evidence.append({"label": "memory_candidate", "source": "prototype_similarity", "prototype": f"memory_{label}", "score": round(proto_score, 3)})
return label, confidence if label != "none" else max(0.0, min(confidence, 0.77)), codes, evidence
def _urgency_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, dict[str, float], list[str], list[dict[str, Any]]]:
high_rule, high_codes, high_ev = best_rule(text, "urgency_high")
critical_rule, critical_codes, critical_ev = best_rule(text, "urgency_critical")
low_rule = 0.82 if re.search(r"\b(no rush|whenever convenient|low priority|someday|backlog)\b", text, re.I) else 0.0
# Urgency is safety-sensitive for notifications. Prefer explicit rules;
# use prototype scores only when they are unusually strong.
score_map = {
"low": max(low_rule, scores.get("urgency_low", 0.0) if scores.get("urgency_low", 0.0) >= 0.9 else 0.0),
"normal": 0.68,
"high": max(high_rule, scores.get("urgency_high", 0.0) if scores.get("urgency_high", 0.0) >= 0.9 else 0.0),
"critical": max(critical_rule, scores.get("urgency_critical", 0.0) if scores.get("urgency_critical", 0.0) >= 0.92 else 0.0),
}
if score_map["critical"] >= 0.9:
score_map["normal"] = 0.05
elif score_map["high"] >= 0.8 or score_map["low"] >= 0.8:
score_map["normal"] = 0.2
value, confidence = max(score_map.items(), key=lambda kv: kv[1])
evidence = high_ev + critical_ev
return value, confidence, score_map, sorted(set(high_codes + critical_codes)), evidence
def _workflow_label(self, scores: dict[str, float], text: str = "") -> tuple[str, float, dict[str, float], list[dict[str, Any]]]:
score_map = {category: scores.get(f"workflow_{category}", 0.0) for category in WORKFLOW_CATEGORIES if category != "unknown"}
rule_patterns: list[tuple[str, str]] = [
("chat", r"\bwhat time is it|what date is it|general question\b"),
("kanban", r"\bkanban|task card|review-required|blocked\b"),
("smart_home", r"\blights?|thermostat|home assistant|hue|wiz\b"),
("media", r"\btranscribe|voice memo|audio|video|image|spotify|youtube\b"),
("research", r"\bresearch|compare sources|papers?|literature|web search\b"),
("devops", r"\bsystemd|docker|kubernetes|service|ports?|gateway|deploy|infrastructure\b"),
("debugging", r"\bdebug|failing|traceback|logs?|reproduce|diagnose\b"),
("coding", r"\bimplement|code|pytest|refactor|feature|PR\b"),
("note_taking", r"\bobsidian|notes?|memory|diary|chroma|reindex\b"),
("productivity", r"\bcalendar|email|spreadsheet|presentation|notion|airtable|linear\b"),
]
rule_value: str | None = None
for category, pattern in rule_patterns:
if re.search(pattern, text, re.I):
rule_value = category
break
if rule_value:
value = rule_value
confidence = max(0.86, score_map.get(rule_value, 0.0))
score_map[rule_value] = confidence
source = "rule"
else:
value, confidence = max(score_map.items(), key=lambda kv: kv[1])
source = "prototype_similarity"
if confidence < 0.52:
value = "unknown"
confidence = 0.52
score_map["unknown"] = 1.0 - confidence if value != "unknown" else confidence
evidence = [{"label": "workflow_category", "source": source, "prototype": f"workflow_{value}", "score": round(confidence, 3)}]
return value, confidence, score_map, evidence
class Handler(BaseHTTPRequestHandler):
server_version = "AtlasRouterClassifier/0.1"
@property
def svc(self) -> ClassifierService:
return self.server.classifier_service # type: ignore[attr-defined]
def do_GET(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
if path in {"/", "/healthz", "/readyz", "/health"}:
self.write_json(self.svc.health())
elif path == "/v1/labels":
self.write_json(self.svc.labels())
else:
self.write_json({"error": "not found"}, status=404)
def do_POST(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
try:
payload = self.read_json()
options = payload.get("options") if isinstance(payload.get("options"), dict) else {}
if path == "/v1/classify":
self.write_json(self.svc.classify(payload.get("id"), str(payload.get("text") or ""), options))
elif path == "/v1/batch_classify":
items = payload.get("items")
if not isinstance(items, list):
raise ValueError("items must be a list")
self.write_json(self.svc.batch_classify(items, options))
else:
self.write_json({"error": "not found"}, status=404)
except ValueError as exc:
self.write_json({"error": str(exc)}, status=400)
except Exception as exc:
self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500)
def read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length") or 0)
body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}"
payload = json.loads(body or "{}")
if not isinstance(payload, dict):
raise ValueError("JSON body must be an object")
return payload
def write_json(self, payload: dict[str, Any], status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name
print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True)
def main() -> int:
parser = argparse.ArgumentParser(description="Dry-run Atlas/Hermes router classifier")
parser.add_argument("--host", default=os.environ.get("OPENVINO_CLASSIFIER_HOST", DEFAULT_HOST))
parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_CLASSIFIER_PORT", DEFAULT_PORT)))
parser.add_argument("--embed-url", default=os.environ.get("OPENVINO_CLASSIFIER_EMBED_URL", DEFAULT_EMBED_URL))
parser.add_argument("--timeout-s", type=float, default=float(os.environ.get("OPENVINO_CLASSIFIER_TIMEOUT_S", "30")))
parser.add_argument("--no-warmup", action="store_true", help="skip prototype embedding warmup until first request")
args = parser.parse_args()
service = ClassifierService(args.embed_url, timeout_s=args.timeout_s)
if not args.no_warmup:
service.warmup()
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.classifier_service = service # type: ignore[attr-defined]
print(f"{SERVICE} listening on {args.host}:{args.port} embed_url={args.embed_url} mode=dry_run", flush=True)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,102 @@
#!/usr/bin/env python3
from __future__ import annotations
import importlib.util
import json
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
MODULE_PATH = ROOT / "router_classifier.py"
spec = importlib.util.spec_from_file_location("router_classifier", MODULE_PATH)
assert spec and spec.loader
router_classifier = importlib.util.module_from_spec(spec)
sys.modules["router_classifier"] = router_classifier
spec.loader.exec_module(router_classifier)
class FakeClient:
def embed(self, texts, *, purpose="query"):
# Deterministic toy embeddings based on keyword buckets. The tests focus on
# rule safety and API shape; live smoke tests cover the real NPU upstream.
vectors = []
for text in texts:
t = text.lower()
vec = [0.0] * 8
if any(w in t for w in ["time", "current", "weather", "news", "port", "git", "logs", "systemd"]):
vec[0] = 1.0
if any(w in t for w in ["remember", "prefer", "preference"]):
vec[1] = 1.0
if any(w in t for w in ["urgent", "down", "outage", "critical"]):
vec[2] = 1.0
if any(w in t for w in ["code", "pytest", "debug", "git", "diff"]):
vec[3] = 1.0
if any(w in t for w in ["service", "systemd", "port", "gateway", "docker"]):
vec[4] = 1.0
if any(w in t for w in ["kanban", "task", "blocked", "review"]):
vec[5] = 1.0
if any(w in t for w in ["light", "thermostat"]):
vec[6] = 1.0
if any(w in t for w in ["transcribe", "voice", "memo", "audio"]):
vec[7] = 1.0
if not any(vec):
vec[0] = 0.2
vectors.append(vec)
return router_classifier.EmbedResult(vectors=vectors, npu_busy_delta_us=123, duration_ms=1.0, embedding_dim=8)
class RouterClassifierTests(unittest.TestCase):
def service(self):
svc = router_classifier.ClassifierService("http://fake.local/v1/embeddings")
svc.client = FakeClient()
svc.warmup()
return svc
def test_health_and_label_schema(self):
svc = self.service()
health = svc.health()
self.assertEqual(health["service"], "atlas-router-classifier")
self.assertEqual(health["mode"], "dry_run")
self.assertIn("tool_needed", health["labels"])
labels = svc.labels()
self.assertIn("workflow_category", labels["enums"])
self.assertIn("safety_confirmation_required", labels["thresholds"])
def test_explicit_preference_is_memory_candidate(self):
result = self.service().classify("pref", "Remember that I prefer concise terminal replies.")
self.assertEqual(result["labels"]["memory_candidate"]["value"], "user_preference")
self.assertGreaterEqual(result["labels"]["memory_candidate"]["confidence"], 0.78)
self.assertFalse(result["labels"]["safety_confirmation_required"]["value"])
def test_current_local_state_needs_tool(self):
result = self.service().classify("port", "Check whether port 18819 is listening and inspect systemd logs.")
self.assertTrue(result["labels"]["tool_needed"]["value"])
self.assertIn("local_state_requested", result["labels"]["tool_needed"]["reason_codes"])
def test_live_gateway_restart_requires_confirmation(self):
result = self.service().classify("safe", "Restart the live Atlas gateway and switch primary routing.")
self.assertTrue(result["labels"]["safety_confirmation_required"]["value"])
self.assertIn("live_service_or_routing_change", result["labels"]["safety_confirmation_required"]["reason_codes"])
def test_batch_shape(self):
result = self.service().batch_classify([
{"id": "a", "text": "What time is it?"},
{"id": "b", "text": "Delete the existing collection and reindex it in place."},
])
self.assertEqual(result["model"], router_classifier.MODEL)
self.assertEqual(len(result["results"]), 2)
self.assertGreater(result["npu_busy_delta_us"], 0)
def test_fixture_file_is_valid_jsonl(self):
fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl"
rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()]
self.assertGreaterEqual(len(rows), 8)
for row in rows:
self.assertIn("id", row)
self.assertIn("text", row)
self.assertIn("expected", row)
if __name__ == "__main__":
unittest.main()
+159
View File
@@ -0,0 +1,159 @@
# OpenVINO NPU document/image triage prototype
Local-only prototype for triaging screenshots, photos/scans, and PDF page images.
It returns structured JSON metadata and explicitly reports CPU vs NPU stages.
Location: `/home/will/lab/swarm/openvino-doc-image-triage-npu/`
## Privacy and safety
- No external uploads.
- The only network call is optional localhost-only embeddings at `127.0.0.1:18817`.
- Raw OCR/sidecar text is redacted by default and is not logged.
- Full source paths are omitted by default; responses include basename and SHA-256.
- Allowed roots are enforced for CLI/server requests.
- This prototype does not mutate Obsidian, RAG, Chroma, vector collections, routing, or gateway services.
## CPU vs NPU stages
CPU:
- file intake, allowed-root checks, size checks, hashing
- image/PDF decoding/rendering and normalization
- optional local text extraction from sidecars or PDF text libraries
- regex metadata extraction and rule-based category fallback
- final needs-attention rules
NPU:
- needs-attention semantic embedding, via existing local OpenVINO embeddings service on `:18817`
- verified with `/sys/class/accel/accel0/device/npu_busy_time_us` before/after each embedding call
Not configured in v1:
- image category classifier on NPU. The JSON reports this as `CPU rule fallback (NPU model not configured in prototype v1)`. A future task can add a static-shape MobileNet/EfficientNet/ResNet OpenVINO IR model.
- OCR on NPU. OCR remains CPU/local plumbing in v1.
## Files
- `triage.py` — core library and CLI.
- `server.py` — stdlib HTTP server with `/healthz`, `/models`, `/triage`, `/triage/batch`.
- `make_samples.py` — creates synthetic non-private image/PDF samples.
- `tests/smoke_test.py` — end-to-end smoke test, including NPU busy-time verification when `:18817` is reachable.
- `samples/` — generated synthetic fixtures.
## Requirements
Use the existing NPU venv when available:
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python -m pip install pillow
```
`pillow` is already present in the discovered `/home/will/.venvs/npu`. Optional local PDF text/rendering improves PDF support:
```bash
/home/will/.venvs/npu/bin/python -m pip install pypdf pypdfium2
```
The smoke tests do not require external services except the existing localhost `:18817` embeddings service for positive NPU verification.
## CLI usage
Generate synthetic samples:
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python make_samples.py
```
Triage local files:
```bash
/home/will/.venvs/npu/bin/python triage.py \
--allowed-root /home/will/lab/swarm/openvino-doc-image-triage-npu \
--pretty \
samples/synthetic_invoice.png samples/synthetic_invoice.pdf
```
Disable the local NPU embeddings call if needed:
```bash
/home/will/.venvs/npu/bin/python triage.py --no-embeddings --allowed-root "$PWD" samples/synthetic_receipt.png
```
Include OCR/sidecar text in a single response only when explicitly requested:
```bash
/home/will/.venvs/npu/bin/python triage.py --include-ocr-text --allowed-root "$PWD" samples/synthetic_invoice.png
```
## HTTP usage
Check that port 18820 is free first:
```bash
ss -ltnp | grep ':18820\b' || true
```
Start local-only server:
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python server.py --host 127.0.0.1 --port 18820 --allowed-root "$PWD"
```
Call it:
```bash
curl -sS http://127.0.0.1:18820/healthz | jq
curl -sS http://127.0.0.1:18820/models | jq
curl -sS -X POST http://127.0.0.1:18820/triage \
-H 'Content-Type: application/json' \
-d '{"path":"/home/will/lab/swarm/openvino-doc-image-triage-npu/samples/synthetic_invoice.png","options":{"allowed_roots":["/home/will/lab/swarm/openvino-doc-image-triage-npu"]}}' | jq
```
## Smoke test
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python tests/smoke_test.py
```
Expected: JSON ending with `"ok": true`. If the embeddings service is up, the result should show positive NPU busy-time delta and each embedded page should report `verified_npu: true`.
## Example output shape
```json
{
"file_id": "sha256:...",
"source_path_basename": "synthetic_invoice.png",
"media_type": "image",
"page_count": 1,
"pages": [
{
"page_index": 0,
"classification": {
"label": "bill_or_invoice",
"confidence": 0.71,
"device": "CPU",
"method": "rule_based_fallback"
},
"needs_attention": {
"value": true,
"device": "NPU+CPU",
"reasons": ["amount_due", "due_date_present"],
"embedding": {"verified_npu": true, "npu_busy_delta_us": 12345}
},
"metadata": {"dates_count": 1, "amounts_count": 1, "raw_values_redacted": true},
"ocr": {"available": true, "device": "CPU"}
}
],
"processing_device_summary": {
"file_intake": "CPU",
"image_category_classification": "CPU rule fallback (NPU model not configured in prototype v1)",
"needs_attention_embedding": "NPU via local :18817",
"metadata_extraction": "CPU",
"npu_verified": true
},
"privacy": {"external_uploads": false, "raw_text_logged": false}
}
```
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
from __future__ import annotations
from pathlib import Path
from PIL import Image, ImageDraw, ImageFilter
ROOT = Path(__file__).resolve().parent
SAMPLES = ROOT / "samples"
def make_doc(path: Path, lines: list[str], size=(900, 1200), rotate: int = 0, blur: bool = False) -> None:
img = Image.new("RGB", size, "white")
draw = ImageDraw.Draw(img)
y = 70
for line in lines:
draw.text((70, y), line, fill="black")
y += 55
draw.rectangle((55, 50, size[0] - 55, min(size[1] - 50, y + 30)), outline="gray", width=3)
if blur:
img = img.filter(ImageFilter.GaussianBlur(2.5))
if rotate:
img = img.rotate(rotate, expand=True, fillcolor="white")
img.save(path)
path.with_suffix(path.suffix + ".txt").write_text("\n".join(lines) + "\n")
def main() -> int:
SAMPLES.mkdir(exist_ok=True)
make_doc(SAMPLES / "synthetic_invoice.png", [
"ACME Utilities Invoice",
"Invoice No: INV-2026-0604",
"Amount Due: $123.45",
"Payment due 2026-06-30",
"Please submit payment by the due date.",
])
make_doc(SAMPLES / "synthetic_receipt.png", [
"Neighborhood Store Receipt",
"Subtotal $14.20",
"Tax $1.42",
"Total $15.62",
"Thank you for shopping",
], size=(720, 1100), rotate=3)
make_doc(SAMPLES / "synthetic_conversation.png", [
"Messages with Alex",
"Can you please respond by tomorrow?",
"Need signature on the form before Friday.",
], size=(1200, 750))
make_doc(SAMPLES / "synthetic_sensitive_form.png", [
"Sample Government Form - Fake Data",
"Applicant: Test Person",
"SSN: 123-45-6789",
"Signature required",
"Submit by Jan 15, 2027",
], blur=False)
make_doc(SAMPLES / "synthetic_blurry.png", [
"Low resolution blurred sample",
"No action required",
], size=(360, 250), blur=True)
# PIL can save a simple local PDF from a synthetic page. This is non-private.
pdf_img = Image.open(SAMPLES / "synthetic_invoice.png").convert("RGB")
pdf_img.save(SAMPLES / "synthetic_invoice.pdf", "PDF")
(SAMPLES / "synthetic_invoice.pdf.txt").write_text((SAMPLES / "synthetic_invoice.png.txt").read_text())
print(f"wrote samples under {SAMPLES}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Binary file not shown.

After

Width:  |  Height:  |  Size: 4.5 KiB

@@ -0,0 +1,2 @@
Low resolution blurred sample
No action required
Binary file not shown.

After

Width:  |  Height:  |  Size: 9.1 KiB

@@ -0,0 +1,3 @@
Messages with Alex
Can you please respond by tomorrow?
Need signature on the form before Friday.
@@ -0,0 +1,5 @@
ACME Utilities Invoice
Invoice No: INV-2026-0604
Amount Due: $123.45
Payment due 2026-06-30
Please submit payment by the due date.
Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

@@ -0,0 +1,5 @@
ACME Utilities Invoice
Invoice No: INV-2026-0604
Amount Due: $123.45
Payment due 2026-06-30
Please submit payment by the due date.
Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

@@ -0,0 +1,5 @@
Neighborhood Store Receipt
Subtotal $14.20
Tax $1.42
Total $15.62
Thank you for shopping
Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

@@ -0,0 +1,5 @@
Sample Government Form - Fake Data
Applicant: Test Person
SSN: 123-45-6789
Signature required
Submit by Jan 15, 2027
+178
View File
@@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""Stdlib localhost HTTP wrapper for the triage prototype.
Endpoints:
- GET /healthz
- GET /models
- POST /triage JSON: {"path":"/local/file", "options": {...}}
- POST /triage/batch JSON: {"paths":["/local/file"], "options": {...}}
The server binds to 127.0.0.1 by default and accepts only local file paths under
configured allowed roots. It never uploads document/image contents externally.
"""
from __future__ import annotations
import argparse
import json
import os
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from triage import DEFAULT_EMBED_URL, TriageOptions, read_npu_busy, triage_batch, triage_file
def _roots_within_configured(requested_roots: list[Any], configured_roots: list[Path]) -> list[Path]:
"""Return request roots only when they narrow the startup allowlist."""
narrowed: list[Path] = []
configured = [root.expanduser().resolve() for root in configured_roots]
for raw in requested_roots:
candidate = Path(str(raw)).expanduser().resolve()
if any(candidate == root or candidate.is_relative_to(root) for root in configured):
narrowed.append(candidate)
else:
raise ValueError("requested allowed_roots must be within configured allowed roots")
return narrowed
def _validated_embedding_url(raw_url: Any) -> str:
"""Allow only the configured local loopback embeddings service."""
url = str(raw_url)
parsed = urlparse(url)
host = parsed.hostname or ""
if (
parsed.scheme == "http"
and host in {"127.0.0.1", "localhost", "::1"}
and (parsed.port or 80) == 18817
and parsed.path == "/v1/embeddings"
and not parsed.username
and not parsed.password
):
return url
raise ValueError("embedding_url override must target the configured local loopback embeddings service")
def make_options(payload: dict[str, Any], default_roots: list[Path]) -> TriageOptions:
opts = payload.get("options") or {}
requested_roots = opts.get("allowed_roots", [])
if requested_roots:
if not isinstance(requested_roots, list):
raise ValueError("allowed_roots must be a list")
roots = _roots_within_configured(requested_roots, default_roots)
else:
roots = default_roots
embedding_url = DEFAULT_EMBED_URL
if "embedding_url" in opts:
embedding_url = _validated_embedding_url(opts["embedding_url"])
return TriageOptions(
max_pages=int(opts.get("max_pages", 3)),
include_ocr_text=bool(opts.get("include_ocr_text", False)),
dry_run=bool(opts.get("dry_run", False)),
use_embeddings=bool(opts.get("use_embeddings", True)),
embedding_url=embedding_url,
allowed_roots=roots,
include_full_path=bool(opts.get("include_full_path", False)),
)
class Handler(BaseHTTPRequestHandler):
server_version = "openvino-doc-image-triage-npu/0.1"
def _json(self, status: int, body: dict[str, Any]) -> None:
data = json.dumps(body, sort_keys=True).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, format: str, *args: Any) -> None:
# Do not log request bodies, OCR text, or file paths.
return
@property
def allowed_roots(self) -> list[Path]:
return self.server.allowed_roots # type: ignore[attr-defined]
def do_GET(self) -> None: # noqa: N802
if self.path in ("/", "/healthz", "/health"):
self._json(200, {
"ok": True,
"service": "openvino-doc-image-triage-npu",
"bind_policy": "localhost-default",
"npu_busy_time_us": read_npu_busy(),
"npu_busy_check_enabled": True,
"allowed_roots": [str(p) for p in self.allowed_roots],
"privacy": {"external_uploads": False, "raw_text_logged": False},
})
return
if self.path == "/models":
self._json(200, {
"models": [
{
"stage": "needs_attention_embedding",
"model": "bge-base-en-v1.5-int8-ov via local :18817",
"target_device": "NPU",
"verification": "sysfs npu_busy_time_us before/after embedding call",
},
{
"stage": "image_category_classification",
"model": "rule-based fallback in prototype v1",
"target_device": "CPU",
"npu_status": "not configured; future static-shape MobileNet/EfficientNet/ResNet OV IR",
},
{"stage": "ocr_text_extraction", "model": "optional local sidecar/PDF text", "target_device": "CPU"},
]
})
return
self._json(404, {"ok": False, "error": "not_found"})
def _read_payload(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
if length > 512 * 1024:
raise ValueError("request JSON too large")
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode())
def do_POST(self) -> None: # noqa: N802
try:
payload = self._read_payload()
options = make_options(payload, self.allowed_roots)
if self.path == "/triage":
path = payload.get("path")
if not path:
self._json(400, {"ok": False, "error": "missing_path"})
return
self._json(200, {"ok": True, "result": triage_file(path, options)})
return
if self.path == "/triage/batch":
paths = payload.get("paths") or []
if not isinstance(paths, list) or not paths:
self._json(400, {"ok": False, "error": "missing_paths"})
return
self._json(200, triage_batch([str(p) for p in paths], options))
return
self._json(404, {"ok": False, "error": "not_found"})
except Exception as exc:
self._json(400, {"ok": False, "error": type(exc).__name__, "message": str(exc)})
def main() -> int:
parser = argparse.ArgumentParser(description="Local-only doc/image triage HTTP server")
parser.add_argument("--host", default=os.environ.get("DOC_IMAGE_TRIAGE_HOST", "127.0.0.1"))
parser.add_argument("--port", type=int, default=int(os.environ.get("DOC_IMAGE_TRIAGE_PORT", "18820")))
parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; may repeat")
args = parser.parse_args()
roots = [Path(p).expanduser().resolve() for p in args.allowed_root] or [Path.cwd().resolve()]
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.allowed_roots = roots # type: ignore[attr-defined]
print(json.dumps({"service": "openvino-doc-image-triage-npu", "host": args.host, "port": args.port, "allowed_roots": [str(p) for p in roots]}), flush=True)
httpd.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,127 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import subprocess
import sys
import tempfile
import time
import urllib.error
import urllib.request
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SAMPLES = ROOT / "samples"
BUSY = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
def run(cmd: list[str]) -> None:
print("+", " ".join(cmd))
subprocess.run(cmd, cwd=ROOT, check=True)
def post_json(url: str, payload: dict) -> dict:
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
def post_json_status(url: str, payload: dict) -> tuple[int, dict]:
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.status, json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
return exc.code, json.loads(exc.read().decode())
def busy() -> int | None:
try:
return int(BUSY.read_text().strip())
except Exception:
return None
def main() -> int:
run([sys.executable, "make_samples.py"])
invoice = SAMPLES / "synthetic_invoice.png"
pdf = SAMPLES / "synthetic_invoice.pdf"
before = busy()
raw = subprocess.check_output([
sys.executable, "triage.py", "--allowed-root", str(ROOT), "--pretty", str(invoice), str(pdf)
], cwd=ROOT, text=True)
data = json.loads(raw)
assert data["ok"], data
first = data["files"][0]["result"]
assert first["privacy"]["external_uploads"] is False
assert first["pages"][0]["classification"]["label"] == "bill_or_invoice"
assert first["pages"][0]["needs_attention"]["value"] is True
assert "amount_due" in first["pages"][0]["needs_attention"]["reasons"]
assert first["processing_device_summary"]["file_intake"] == "CPU"
assert "NPU" in first["processing_device_summary"]["needs_attention_embedding"] or first["pages"][0]["needs_attention"]["device"] == "CPU"
after = busy()
if before is not None and after is not None:
# If :18817 is reachable and text was embedded, NPU delta must be positive.
emb = first["pages"][0]["needs_attention"]["embedding"]
if emb.get("used"):
assert emb.get("verified_npu") is True, emb
assert (emb.get("npu_busy_delta_us") or 0) > 0, emb
assert after > before, {"before": before, "after": after, "embedding": emb}
# HTTP smoke on an ephemeral localhost port so we do not collide with 18820 during tests.
proc = subprocess.Popen([sys.executable, "server.py", "--host", "127.0.0.1", "--port", "18828", "--allowed-root", str(ROOT)], cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
try:
deadline = time.time() + 5
while time.time() < deadline:
try:
health = urllib.request.urlopen("http://127.0.0.1:18828/healthz", timeout=1).read()
assert b"openvino-doc-image-triage-npu" in health
break
except Exception:
time.sleep(0.1)
else:
raise AssertionError("server did not become ready")
resp = post_json("http://127.0.0.1:18828/triage", {"path": str(invoice), "options": {"allowed_roots": [str(ROOT)]}})
assert resp["ok"] is True, resp
assert resp["result"]["source_path_basename"] == "synthetic_invoice.png"
assert "source_path" not in resp["result"]
# Request bodies may narrow but must not widen the startup --allowed-root policy.
with tempfile.NamedTemporaryFile(suffix=".txt") as outside:
outside.write(b"sensitive text outside configured artifact root")
outside.flush()
status, blocked = post_json_status(
"http://127.0.0.1:18828/triage",
{"path": outside.name, "options": {"allowed_roots": ["/tmp"], "dry_run": True, "use_embeddings": False}},
)
assert status == 400, blocked
assert blocked["ok"] is False, blocked
assert "allowed_roots" in blocked.get("message", ""), blocked
# Request bodies must not redirect extracted text to caller-supplied endpoints.
status, blocked = post_json_status(
"http://127.0.0.1:18828/triage",
{"path": str(invoice), "options": {"embedding_url": "http://198.51.100.1:9/v1/embeddings"}},
)
assert status == 400, blocked
assert blocked["ok"] is False, blocked
assert "embedding_url" in blocked.get("message", ""), blocked
finally:
proc.terminate()
proc.wait(timeout=5)
print(json.dumps({
"ok": True,
"samples": len(list(SAMPLES.glob("synthetic_*"))),
"npu_busy_before": before,
"npu_busy_after": after,
"npu_delta_observed": None if before is None or after is None else after - before,
"triage_label": first["pages"][0]["classification"]["label"],
"needs_attention": first["pages"][0]["needs_attention"]["value"],
}, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+459
View File
@@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""Local-only document/image triage prototype.
CPU stages:
- local file intake, hashing, MIME/extension checks
- image/PDF-page decoding and normalization
- optional sidecar/native-text extraction
- regex metadata extraction and rule-based category fallback
NPU stages:
- needs-attention semantic embedding via the existing local OpenVINO NPU
embeddings service on 127.0.0.1:18817, verified by sysfs busy-time delta.
No external uploads are performed. The only network call is localhost to the
embedding service when enabled.
"""
from __future__ import annotations
import argparse
import base64
import dataclasses
import datetime as dt
import hashlib
import io
import json
import mimetypes
import os
import re
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
try:
from PIL import Image, ImageOps
except Exception as exc: # pragma: no cover - caught in CLI smoke
raise SystemExit("Pillow is required: install pillow in the active Python env") from exc
NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings"
DEFAULT_ALLOWED_ROOTS = [Path.cwd()]
MAX_FILE_BYTES = 25 * 1024 * 1024
CATEGORY_LABELS = [
"receipt",
"bill_or_invoice",
"tax_or_financial",
"medical_or_insurance",
"legal_or_government",
"form_or_application",
"travel_or_ticket",
"screenshot_conversation",
"screenshot_web_or_app",
"identity_or_sensitive",
"photo_misc",
"unknown_or_low_confidence",
]
DATE_PATTERNS = [
re.compile(r"\b(20\d{2}[-/](?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01]))\b"),
re.compile(r"\b((?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01])[-/](?:20)?\d{2})\b"),
re.compile(r"\b((?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+20\d{2})\b", re.I),
]
AMOUNT_RE = re.compile(r"(?<!\w)(?:USD\s*)?\$\s?\d{1,4}(?:,\d{3})*(?:\.\d{2})?\b", re.I)
EMAIL_RE = re.compile(r"\b[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\b")
PHONE_RE = re.compile(r"\b(?:\+?1[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?){2}\d{4}\b")
ACCOUNT_RE = re.compile(r"\b(?:account|acct|policy|invoice|member|case|claim)\s*(?:#|no\.?|id)?\s*[:\-]?\s*[A-Z0-9-]{4,}\b", re.I)
SSN_LIKE_RE = re.compile(r"\b\d{3}-\d{2}-\d{4}\b")
ATTENTION_KEYWORDS = {
"due_date_present": ["due", "payment due", "pay by", "deadline"],
"amount_due": ["amount due", "balance due", "total due", "$"],
"action_required_language": ["action required", "please respond", "complete", "submit", "renew", "verify"],
"signature_required": ["signature", "sign and return", "signed"],
"appointment_or_deadline": ["appointment", "scheduled", "reservation", "hearing"],
"account_security": ["security", "password", "unauthorized", "fraud", "verify your account"],
"medical_followup": ["follow up", "lab result", "referral", "insurance"],
"tax_deadline": ["irs", "tax", "1099", "w-2", "deadline"],
}
CATEGORY_KEYWORDS = {
"receipt": ["receipt", "subtotal", "cashier", "change", "store"],
"bill_or_invoice": ["invoice", "amount due", "balance due", "statement", "payment due"],
"tax_or_financial": ["tax", "irs", "1099", "w-2", "bank", "routing"],
"medical_or_insurance": ["medical", "insurance", "clinic", "patient", "claim"],
"legal_or_government": ["court", "government", "department", "notice", "license"],
"form_or_application": ["application", "form", "signature", "submit"],
"travel_or_ticket": ["boarding", "ticket", "itinerary", "reservation", "gate"],
"screenshot_conversation": ["message", "chat", "reply", "conversation"],
"screenshot_web_or_app": ["login", "browser", "app", "settings", "dashboard"],
"identity_or_sensitive": ["ssn", "passport", "driver license", "social security"],
}
@dataclasses.dataclass
class TriageOptions:
max_pages: int = 3
include_ocr_text: bool = False
dry_run: bool = False
use_embeddings: bool = True
embedding_url: str = DEFAULT_EMBED_URL
allowed_roots: list[Path] = dataclasses.field(default_factory=lambda: DEFAULT_ALLOWED_ROOTS.copy())
include_full_path: bool = False
timeout_seconds: float = 10.0
def read_npu_busy() -> int | None:
try:
return int(NPU_BUSY_PATH.read_text().strip())
except Exception:
return None
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def under_allowed_root(path: Path, roots: list[Path]) -> bool:
resolved = path.resolve()
for root in roots:
try:
resolved.relative_to(root.resolve())
return True
except ValueError:
continue
return False
def sidecar_text(path: Path) -> tuple[str, str | None]:
for suffix in (path.suffix + ".txt", ".txt"):
candidate = path.with_suffix(suffix) if suffix.startswith(path.suffix) else path.with_suffix(suffix)
if candidate.exists() and candidate.is_file():
try:
return candidate.read_text(errors="replace")[:12000], f"sidecar:{candidate.name}"
except Exception:
return "", "sidecar_unreadable"
return "", None
def extract_pdf_text(path: Path, max_pages: int) -> tuple[str, str | None]:
# Optional dependency; tests do not require it. Keeps PDF support local-only when installed.
try:
import pypdf # type: ignore
except Exception:
return "", "pypdf_not_installed"
try:
reader = pypdf.PdfReader(str(path))
if getattr(reader, "is_encrypted", False):
return "", "pdf_encrypted"
chunks = []
for page in reader.pages[:max_pages]:
chunks.append(page.extract_text() or "")
return "\n".join(chunks)[:12000], "pypdf_cpu"
except Exception as exc:
return "", f"pdf_text_error:{type(exc).__name__}"
def load_image_pages(path: Path, max_pages: int) -> tuple[list[Image.Image], str | None]:
ext = path.suffix.lower()
if ext == ".pdf":
try:
import pypdfium2 as pdfium # type: ignore
except Exception:
return [], "pypdfium2_not_installed"
try:
pdf = pdfium.PdfDocument(str(path))
pages = []
for i in range(min(len(pdf), max_pages)):
bitmap = pdf[i].render(scale=1.5)
pages.append(bitmap.to_pil().convert("RGB"))
return pages, None
except Exception as exc:
return [], f"pdf_render_error:{type(exc).__name__}"
try:
img = Image.open(path)
img = ImageOps.exif_transpose(img).convert("RGB")
return [img], None
except Exception as exc:
return [], f"image_decode_error:{type(exc).__name__}"
def normalize_for_hash_features(img: Image.Image) -> dict[str, Any]:
small = ImageOps.contain(img.copy(), (224, 224))
gray = small.convert("L")
hist = gray.histogram()
pixels = max(1, gray.width * gray.height)
mean = sum(i * c for i, c in enumerate(hist)) / pixels
variance = sum(((i - mean) ** 2) * c for i, c in enumerate(hist)) / pixels
return {
"mean_luma": round(mean, 2),
"contrast": round(variance ** 0.5, 2),
"aspect_ratio": round(img.width / max(1, img.height), 3),
}
def classify_rule(text: str, image_features: dict[str, Any]) -> dict[str, Any]:
t = text.lower()
best_label = "unknown_or_low_confidence"
best_score = 0
for label, words in CATEGORY_KEYWORDS.items():
score = sum(1 for word in words if word in t)
if score > best_score:
best_label, best_score = label, score
if best_score == 0:
ar = image_features.get("aspect_ratio", 1.0)
if ar > 1.3:
best_label, best_score = "screenshot_web_or_app", 1
else:
best_label, best_score = "unknown_or_low_confidence", 0
confidence = min(0.35 + 0.18 * best_score, 0.92) if best_score else 0.2
if confidence < 0.45:
best_label = "unknown_or_low_confidence"
return {
"label": best_label,
"confidence": round(confidence, 3),
"device": "CPU",
"stage": "category_classification",
"method": "rule_based_fallback",
"npu_status": "not_configured_for_prototype_v1",
"candidate_labels": CATEGORY_LABELS,
}
def extract_metadata(text: str) -> dict[str, Any]:
dates = []
for pat in DATE_PATTERNS:
dates.extend(m.group(1) for m in pat.finditer(text))
amounts = AMOUNT_RE.findall(text)
flags = {
"org_present": bool(re.search(r"\b(?:inc|llc|clinic|department|bank|insurance|store)\b", text, re.I)),
"address_present": bool(re.search(r"\b\d{2,5}\s+[A-Za-z0-9 .]+\s+(?:st|street|ave|avenue|rd|road|blvd|drive|dr)\b", text, re.I)),
"phone_present": bool(PHONE_RE.search(text)),
"email_present": bool(EMAIL_RE.search(text)),
"policy_or_account_id_present": bool(ACCOUNT_RE.search(text)),
"identity_number_like_present": bool(SSN_LIKE_RE.search(text)),
}
return {
"dates_count": len(set(dates)),
"amounts_count": len(set(amounts)),
"detected_entities": flags,
"raw_values_redacted": True,
}
def call_embeddings(text: str, url: str, timeout: float) -> dict[str, Any]:
if not text.strip():
return {"used": False, "device": "NPU", "status": "skipped_no_text", "npu_busy_delta_us": 0}
before = read_npu_busy()
payload = json.dumps({"input": text[:2048], "purpose": "document"}).encode()
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"})
t0 = time.perf_counter()
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read(1024 * 1024)
status = resp.status
parsed = json.loads(body.decode())
dim = None
if isinstance(parsed, dict) and parsed.get("data"):
emb = parsed["data"][0].get("embedding", [])
dim = len(emb) if isinstance(emb, list) else None
after = read_npu_busy()
delta = (after - before) if before is not None and after is not None else None
return {
"used": True,
"device": "NPU",
"status": "ok" if status == 200 else f"http_{status}",
"embedding_dim": dim,
"wall_ms": round((time.perf_counter() - t0) * 1000, 2),
"npu_busy_delta_us": delta,
"verified_npu": bool(delta and delta > 0),
"endpoint": "127.0.0.1:18817",
}
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
after = read_npu_busy()
delta = (after - before) if before is not None and after is not None else None
return {
"used": False,
"device": "NPU",
"status": f"embedding_service_error:{type(exc).__name__}",
"npu_busy_delta_us": delta,
"verified_npu": False,
"endpoint": "127.0.0.1:18817",
}
def needs_attention(text: str, embedding_result: dict[str, Any]) -> dict[str, Any]:
t = text.lower()
reasons = []
for reason, words in ATTENTION_KEYWORDS.items():
if any(word in t for word in words):
reasons.append(reason)
meta = extract_metadata(text)
if meta["amounts_count"]:
reasons.append("amount_due")
if meta["dates_count"]:
reasons.append("due_date_present")
reasons = sorted(set(reasons))
value = bool(reasons)
confidence = min(0.45 + 0.1 * len(reasons), 0.9) if value else 0.35
if embedding_result.get("verified_npu"):
confidence = min(confidence + 0.05, 0.95)
return {
"value": value,
"confidence": round(confidence, 3),
"reasons": reasons or (["low_confidence"] if not text.strip() else []),
"device": "NPU+CPU" if embedding_result.get("used") else "CPU",
"stage": "needs_attention",
"method": "NPU embedding verification + CPU rules" if embedding_result.get("used") else "CPU rules fallback",
"embedding": embedding_result,
}
def infer_media_type(path: Path, is_pdf_page: bool = False) -> str:
if is_pdf_page:
return "pdf_page"
mt, _ = mimetypes.guess_type(path.name)
if path.suffix.lower() == ".pdf":
return "pdf"
if mt and mt.startswith("image/"):
return "image"
return "unknown"
def triage_file(path_like: str | Path, options: TriageOptions | None = None) -> dict[str, Any]:
options = options or TriageOptions()
path = Path(path_like).expanduser()
resolved = path.resolve()
if not under_allowed_root(resolved, options.allowed_roots):
raise ValueError(f"path is outside allowed roots: {path}")
if not resolved.exists() or not resolved.is_file():
raise FileNotFoundError(str(path))
size = resolved.stat().st_size
if size > MAX_FILE_BYTES:
raise ValueError(f"file too large for prototype limit: {size} bytes")
file_hash = sha256_file(resolved)
text, text_source = sidecar_text(resolved)
pdf_text_status = None
if resolved.suffix.lower() == ".pdf" and not text:
text, pdf_text_status = extract_pdf_text(resolved, options.max_pages)
text_source = pdf_text_status
pages: list[dict[str, Any]] = []
render_error = None
if not options.dry_run:
images, render_error = load_image_pages(resolved, options.max_pages)
else:
images = []
if not images and options.dry_run:
images = []
elif not images:
# Return a file-level record even if PDF rendering is unavailable.
images = []
embedding_result = call_embeddings(text, options.embedding_url, options.timeout_seconds) if options.use_embeddings else {"used": False, "device": "NPU", "status": "disabled", "npu_busy_delta_us": 0, "verified_npu": False}
attn = needs_attention(text, embedding_result)
meta = extract_metadata(text)
if images:
for idx, img in enumerate(images):
features = normalize_for_hash_features(img)
classification = classify_rule(text, features)
pages.append({
"page_index": idx,
"media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"),
"image": {"width": img.width, "height": img.height, "orientation": "portrait" if img.height >= img.width else "landscape", **features},
"classification": classification,
"needs_attention": attn,
"metadata": meta,
"ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source},
})
else:
classification = classify_rule(text, {"aspect_ratio": 1.0})
pages.append({
"page_index": 0,
"media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"),
"image": {"width": None, "height": None, "orientation": None, "render_error": render_error},
"classification": classification,
"needs_attention": attn,
"metadata": meta,
"ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source},
})
result: dict[str, Any] = {
"file_id": f"sha256:{file_hash}",
"source_path_basename": resolved.name,
"media_type": infer_media_type(resolved),
"file_size_bytes": size,
"page_count": len(pages),
"pages": pages,
"processing_device_summary": {
"file_intake": "CPU",
"pdf_rendering": "CPU" if resolved.suffix.lower() == ".pdf" else "not_applicable",
"image_category_classification": "CPU rule fallback (NPU model not configured in prototype v1)",
"ocr_text_extraction": "CPU/local sidecar or optional local PDF text extractor",
"needs_attention_embedding": "NPU via local :18817" if embedding_result.get("used") else "CPU fallback/no text",
"metadata_extraction": "CPU",
"npu_verified": bool(embedding_result.get("verified_npu")),
"npu_busy_delta_us": embedding_result.get("npu_busy_delta_us"),
},
"privacy": {
"external_uploads": False,
"localhost_only_embedding_call": bool(options.use_embeddings),
"raw_text_logged": False,
"raw_values_redacted": True,
"full_path_included": options.include_full_path,
},
"errors": [e for e in [render_error, pdf_text_status if pdf_text_status and not text else None] if e],
}
if options.include_full_path:
result["source_path"] = str(resolved)
if options.include_ocr_text:
result["ocr_text"] = text
return result
def triage_batch(paths: list[str], options: TriageOptions | None = None) -> dict[str, Any]:
items = []
for p in paths:
try:
items.append({"ok": True, "result": triage_file(p, options)})
except Exception as exc:
items.append({"ok": False, "source_path_basename": Path(p).name, "error": type(exc).__name__, "message": str(exc)})
return {"ok": all(item["ok"] for item in items), "files": items, "generated_at": dt.datetime.now(dt.UTC).isoformat()}
def cli() -> int:
parser = argparse.ArgumentParser(description="Local document/image triage prototype")
parser.add_argument("paths", nargs="+", help="local image/PDF paths")
parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; defaults to cwd")
parser.add_argument("--max-pages", type=int, default=3)
parser.add_argument("--include-ocr-text", action="store_true")
parser.add_argument("--include-full-path", action="store_true")
parser.add_argument("--no-embeddings", action="store_true", help="disable local NPU embedding call")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--pretty", action="store_true")
args = parser.parse_args()
roots = [Path(p) for p in args.allowed_root] if args.allowed_root else [Path.cwd()]
options = TriageOptions(
max_pages=args.max_pages,
include_ocr_text=args.include_ocr_text,
dry_run=args.dry_run,
use_embeddings=not args.no_embeddings,
allowed_roots=roots,
include_full_path=args.include_full_path,
)
out = triage_batch(args.paths, options)
print(json.dumps(out, indent=2 if args.pretty else None, sort_keys=True))
return 0 if out["ok"] else 2
if __name__ == "__main__":
raise SystemExit(cli())
+111
View File
@@ -0,0 +1,111 @@
# OpenVINO GenAI NPU worker prototype
Local-only prototype for cheap bounded background generation on Will's Intel NPU. It is intentionally isolated from primary Atlas/Hermes routing.
## What it does
- Model: `OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov`.
- Runtime: `/home/will/.venvs/npu` with `openvino-genai==2026.2.0.0`.
- Device: OpenVINO GenAI `NPU`.
- Default bind: `127.0.0.1:18820`.
- Jobs: `title`, `summary`, `notification`, `memory_candidate`.
- Prompt/input limits: 6000 chars, `MAX_PROMPT_LEN=1024`, max 256 generated tokens.
The worker does not write memory, does not restart Atlas/Hermes, does not change primary routing, and does not log raw prompt bodies by default.
## Files
- `worker.py` — stdlib HTTP API plus CLI wrapper.
- `smoke_llm_npu.py` — direct GenAI smoke test with NPU busy-time verification.
- `systemd/openvino-genai-npu-worker.service` — optional user-service template; not installed by this prototype.
## Model/cache
Downloaded model path:
```text
/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov
```
OpenVINO compile cache path:
```text
/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4
```
NPU pipeline config used by the prototype:
```python
CACHE_DIR=/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4
MAX_PROMPT_LEN=1024
MIN_RESPONSE_LEN=64
PREFILL_HINT=DYNAMIC
GENERATE_HINT=FAST_COMPILE
```
AOT/blob note: first milestone uses `CACHE_DIR` only. Do not switch to manual `EXPORT_BLOB`/`BLOB_PATH` until compile latency is proven to be the bottleneck. If explicit blobs are used later, record OpenVINO version, NPU compiler version, driver version, model id, quantization flags, and source weights path; invalidate blobs after OpenVINO/NPU driver upgrades.
## Direct smoke test
```bash
cd /home/will/lab/swarm/openvino-genai-npu-worker
/home/will/.venvs/npu/bin/python smoke_llm_npu.py
```
Acceptance requires `npu_busy_delta_us > 0`.
Observed cold-ish smoke after download/cache setup:
```json
{
"text": "\"Atlas Summarizes NPU Worker Options Requested by User\"",
"timing_ms": {"load": 10989.08, "generate": 3157.94, "total": 14147.02},
"npu_busy_delta_us": 2650724
}
```
## CLI usage
```bash
/home/will/.venvs/npu/bin/python worker.py \
--job title \
--input 'Kanban task asks for a small OpenVINO GenAI NPU worker prototype.'
```
## HTTP usage
Start locally only:
```bash
cd /home/will/lab/swarm/openvino-genai-npu-worker
/home/will/.venvs/npu/bin/python worker.py --host 127.0.0.1 --port 18820
```
Endpoints:
```text
GET /healthz
GET /models
POST /v1/worker/generate
POST /v1/worker/extract-memory-candidates
POST /v1/worker/condense-notification
```
Example:
```bash
curl -s http://127.0.0.1:18820/v1/worker/generate \
-H 'Content-Type: application/json' \
-d '{"job":"summary","input":"Build a bounded local NPU worker for small generation tasks, no primary routing changes.","max_new_tokens":80}' \
| python -m json.tool
```
Response includes `npu_busy_delta_us`; treat zero as failure even if HTTP status is 200.
## Safety boundaries
- Binds only to `127.0.0.1` by default; non-local bind is refused in code.
- No raw request-body logging.
- No private external uploads.
- No Atlas/Hermes gateway restarts or primary model routing changes.
- NPU access is serialized with a process lock because the NPU is a shared resource with existing services.
@@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""Smoke-test OpenVINO GenAI LLMPipeline on Intel NPU.
This verifies NPU execution by reading /sys/class/accel/accel0/device/npu_busy_time_us
before and after generation. HTTP 200/service success is not considered proof.
"""
from __future__ import annotations
import argparse
import json
import time
from pathlib import Path
import openvino_genai as ov_genai
DEFAULT_MODEL = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov"
DEFAULT_CACHE = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4"
BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
def read_busy() -> int:
return int(BUSY_PATH.read_text().strip())
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--model", default=DEFAULT_MODEL)
parser.add_argument("--cache-dir", default=DEFAULT_CACHE)
parser.add_argument("--prompt", default="Write a concise title for: User asked Atlas to summarize NPU worker options.")
parser.add_argument("--max-new-tokens", type=int, default=24)
args = parser.parse_args()
model_path = Path(args.model)
cache_dir = Path(args.cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
if not model_path.exists():
raise SystemExit(f"model path does not exist: {model_path}")
config = {
"CACHE_DIR": str(cache_dir),
"MAX_PROMPT_LEN": 1024,
"MIN_RESPONSE_LEN": 64,
"PREFILL_HINT": "DYNAMIC",
"GENERATE_HINT": "FAST_COMPILE",
}
before = read_busy()
load_start = time.monotonic()
pipe = ov_genai.LLMPipeline(str(model_path), "NPU", config)
load_ms = round((time.monotonic() - load_start) * 1000, 2)
gen_start = time.monotonic()
output = pipe.generate(args.prompt, max_new_tokens=args.max_new_tokens)
gen_ms = round((time.monotonic() - gen_start) * 1000, 2)
after = read_busy()
result = {
"model": str(model_path),
"device": "NPU",
"cache_dir": str(cache_dir),
"prompt_chars": len(args.prompt),
"max_new_tokens": args.max_new_tokens,
"text": str(output).strip(),
"timing_ms": {"load": load_ms, "generate": gen_ms, "total": round(load_ms + gen_ms, 2)},
"npu_busy_before_us": before,
"npu_busy_after_us": after,
"npu_busy_delta_us": after - before,
}
print(json.dumps(result, indent=2))
return 0 if after > before else 2
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,16 @@
[Unit]
Description=OpenVINO GenAI NPU worker prototype
After=network-online.target
[Service]
Type=simple
WorkingDirectory=/home/will/lab/swarm/openvino-genai-npu-worker
Environment=OV_GENAI_NPU_MODEL=/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov
Environment=OV_GENAI_NPU_CACHE=/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4
Environment=OV_GENAI_NPU_PORT=18820
ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-genai-npu-worker/worker.py --host 127.0.0.1 --port 18820
Restart=on-failure
RestartSec=5
[Install]
WantedBy=default.target
+251
View File
@@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""Local-only OpenVINO GenAI NPU worker.
Small bounded LLM worker for cheap background tasks. It intentionally does not
wire into Atlas/Hermes routing and does not log raw prompts by default.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any, cast
from urllib.parse import urlparse
import openvino_genai as ov_genai # type: ignore[import-not-found]
MODEL_ID = "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov"
DEFAULT_MODEL_PATH = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov"
DEFAULT_CACHE_DIR = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4"
BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
HOST = "127.0.0.1"
PORT = 18820
MAX_INPUT_CHARS = 6000
DEFAULTS = {
"title": 32,
"summary": 160,
"memory_candidate": 192,
"notification": 96,
}
PROMPTS = {
"title": "Write one concise title, 8 words or fewer. Return only the title.\n\nInput:\n{input}",
"summary": "Summarize the input in one short paragraph or up to 4 bullets. Be factual and concise.\n\nInput:\n{input}",
"memory_candidate": (
"Extract durable memory candidates from the conversation excerpt. "
"Return strict JSON with keys: candidates (array of objects with fact, confidence, reason), notes. "
"Do not write memory; only propose candidates.\n\nInput:\n{input}"
),
"notification": (
"Condense this notification or log excerpt for a human. "
"Return JSON with keys: severity (info|warning|error), category, summary, action_needed.\n\nInput:\n{input}"
),
}
def read_busy() -> int:
return int(BUSY_PATH.read_text().strip())
def coerce_json(text: str) -> Any | None:
text = text.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r"(\{.*\}|\[.*\])", text, re.S)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
return None
return None
@dataclass
class GenerationResult:
text: str
parsed_json: Any | None
timing_ms: dict[str, float]
npu_busy_delta_us: int
npu_busy_before_us: int
npu_busy_after_us: int
class NpuWorker:
def __init__(self, model_path: str, cache_dir: str):
self.model_path = Path(model_path)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._pipe = None
self._load_ms: float | None = None
self._lock = threading.Lock()
self._loaded_at: float | None = None
if not self.model_path.exists():
raise FileNotFoundError(f"model path does not exist: {self.model_path}")
def load(self) -> None:
if self._pipe is not None:
return
start = time.monotonic()
# NPU GenAI requires bounded prompt/response shapes; CACHE_DIR enables compiled blob caching.
self._pipe = ov_genai.LLMPipeline(
str(self.model_path),
"NPU",
CACHE_DIR=str(self.cache_dir),
MAX_PROMPT_LEN=1024,
MIN_RESPONSE_LEN=64,
PREFILL_HINT="DYNAMIC",
GENERATE_HINT="FAST_COMPILE",
)
self._load_ms = round((time.monotonic() - start) * 1000, 2)
self._loaded_at = time.time()
def generate(self, job: str, user_input: str, max_new_tokens: int | None = None) -> GenerationResult:
if job not in PROMPTS:
raise ValueError(f"unsupported job: {job}")
if not isinstance(user_input, str) or not user_input.strip():
raise ValueError("input must be a non-empty string")
if len(user_input) > MAX_INPUT_CHARS:
raise ValueError(f"input too long: {len(user_input)} chars > {MAX_INPUT_CHARS}")
max_new_tokens = int(max_new_tokens or DEFAULTS[job])
if max_new_tokens < 1 or max_new_tokens > 256:
raise ValueError("max_new_tokens must be between 1 and 256")
prompt = PROMPTS[job].format(input=user_input.strip())
with self._lock:
load_start = time.monotonic()
self.load()
load_ms = round((time.monotonic() - load_start) * 1000, 2)
before = read_busy()
gen_start = time.monotonic()
pipe = cast(Any, self._pipe)
text = str(pipe.generate(prompt, max_new_tokens=max_new_tokens)).strip()
generate_ms = round((time.monotonic() - gen_start) * 1000, 2)
after = read_busy()
parsed = coerce_json(text) if job in {"memory_candidate", "notification"} else None
if job == "memory_candidate" and isinstance(parsed, list):
parsed = {"candidates": parsed, "notes": "model returned a top-level array; worker wrapped it to preserve the API contract"}
return GenerationResult(
text=text,
parsed_json=parsed,
timing_ms={"load": load_ms, "initial_load": self._load_ms or 0.0, "generate": generate_ms, "total": round(load_ms + generate_ms, 2)},
npu_busy_delta_us=after - before,
npu_busy_before_us=before,
npu_busy_after_us=after,
)
def health(self) -> dict[str, Any]:
return {
"ok": True,
"model": MODEL_ID,
"model_path": str(self.model_path),
"device": "NPU",
"cache_dir": str(self.cache_dir),
"cache_exists": self.cache_dir.exists(),
"loaded": self._pipe is not None,
"initial_load_ms": self._load_ms,
"loaded_at": self._loaded_at,
"busy_time_us": read_busy(),
"max_input_chars": MAX_INPUT_CHARS,
"jobs": sorted(PROMPTS),
"bind": f"{HOST}:{PORT}",
}
def response_payload(worker: NpuWorker, job: str, result: GenerationResult) -> dict[str, Any]:
return {
"model": MODEL_ID,
"device": "NPU",
"job": job,
"text": result.text,
"json": result.parsed_json,
"timing_ms": result.timing_ms,
"npu_busy_delta_us": result.npu_busy_delta_us,
"npu_busy_before_us": result.npu_busy_before_us,
"npu_busy_after_us": result.npu_busy_after_us,
"cache_dir": str(worker.cache_dir),
}
def make_handler(worker: NpuWorker):
class Handler(BaseHTTPRequestHandler):
server_version = "openvino-genai-npu-worker/0.1"
def log_message(self, format: str, *args: Any) -> None:
# Log only method/path/status metadata, not raw request bodies.
print(f"{self.client_address[0]} {format % args}")
def send_json(self, status: int, payload: Any) -> None:
body = json.dumps(payload, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/healthz":
self.send_json(200, worker.health())
elif path == "/models":
self.send_json(200, {"models": [{"id": MODEL_ID, "path": str(worker.model_path), "device": "NPU"}]})
else:
self.send_json(404, {"error": "not found"})
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
route_job = {
"/v1/worker/generate": None,
"/v1/worker/extract-memory-candidates": "memory_candidate",
"/v1/worker/condense-notification": "notification",
}.get(path, "__missing__")
if route_job == "__missing__":
self.send_json(404, {"error": "not found"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
payload = json.loads(self.rfile.read(length) or b"{}")
job = route_job or str(payload.get("job", "summary"))
if job == "memory":
job = "memory_candidate"
result = worker.generate(job, str(payload.get("input", "")), payload.get("max_new_tokens"))
self.send_json(200, response_payload(worker, job, result))
except Exception as exc:
self.send_json(400, {"error": str(exc)})
return Handler
def cli(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="OpenVINO GenAI NPU worker")
parser.add_argument("--model-path", default=os.environ.get("OV_GENAI_NPU_MODEL", DEFAULT_MODEL_PATH))
parser.add_argument("--cache-dir", default=os.environ.get("OV_GENAI_NPU_CACHE", DEFAULT_CACHE_DIR))
parser.add_argument("--host", default=HOST)
parser.add_argument("--port", type=int, default=int(os.environ.get("OV_GENAI_NPU_PORT", PORT)))
parser.add_argument("--job", choices=sorted(PROMPTS), help="Run one CLI job instead of serving HTTP")
parser.add_argument("--input", help="Input text for --job")
parser.add_argument("--max-new-tokens", type=int)
args = parser.parse_args(argv)
worker = NpuWorker(args.model_path, args.cache_dir)
if args.job:
result = worker.generate(args.job, args.input or "", args.max_new_tokens)
print(json.dumps(response_payload(worker, args.job, result), indent=2))
return 0 if result.npu_busy_delta_us > 0 else 2
if args.host != "127.0.0.1":
raise SystemExit("Refusing non-local bind without code change/explicit approval")
server = ThreadingHTTPServer((args.host, args.port), make_handler(worker))
print(f"serving {MODEL_ID} on http://{args.host}:{args.port}; raw prompts are not logged")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(cli())
+138
View File
@@ -0,0 +1,138 @@
# OpenVINO NPU reranker service
Local-first cross-encoder reranker prototype for second-stage RAG ranking.
- Default bind: `127.0.0.1:18818`
- Default model: `cross-encoder/ms-marco-MiniLM-L6-v2`
- Default device: `NPU`
- Model cache: `/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov/`
- NPU proof: `/sys/class/accel/accel0/device/npu_busy_time_us` delta before/after inference
This service is intentionally not wired into live RAG by default.
## Files
- `server.py` — stdlib HTTP OpenVINO Runtime service.
- `smoke.py` — non-private API/ranking/NPU busy-time smoke test.
- `openvino-reranker.service` — optional user-systemd unit.
## One-time setup
Use a separate venv so the existing Whisper/embeddings NPU venv is not perturbed:
```bash
python -m venv /home/will/.venvs/openvino-reranker
source /home/will/.venvs/openvino-reranker/bin/activate
python -m pip install -U pip
python -m pip install "openvino>=2026.2" "optimum-intel[openvino]" transformers tokenizers nncf numpy
```
Export the model:
```bash
source /home/will/.venvs/openvino-reranker/bin/activate
optimum-cli export openvino \
--model cross-encoder/ms-marco-MiniLM-L6-v2 \
--task text-classification \
--weight-format int8 \
--trust-remote-code false \
/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov
```
If INT8 export or NPU compile fails, export an FP16/FP32 IR to a separate directory and point `OPENVINO_RERANKER_MODEL_DIR` at it while debugging. Do not overwrite existing vector/RAG/Chroma collections.
## Run in foreground
Check the port and NPU counter first:
```bash
ss -ltnp | grep ':18818 ' || true
cat /sys/class/accel/accel0/device/npu_busy_time_us
```
Start locally:
```bash
source /home/will/.venvs/openvino-reranker/bin/activate
OPENVINO_RERANKER_HOST=127.0.0.1 \
OPENVINO_RERANKER_PORT=18818 \
OPENVINO_RERANKER_DEVICE=NPU \
OPENVINO_RERANKER_MODEL_DIR=/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov \
python /home/will/lab/swarm/openvino-reranker-npu/server.py
```
Startup performs a non-private smoke inference and fails closed when `OPENVINO_RERANKER_DEVICE=NPU` but `npu_busy_time_us` does not increase.
## API
Health:
```bash
curl -sS http://127.0.0.1:18818/healthz | jq
curl -sS http://127.0.0.1:18818/readyz | jq
```
Rerank:
```bash
curl -sS http://127.0.0.1:18818/rerank \
-H 'Content-Type: application/json' \
-d '{
"query":"how do I verify OpenVINO NPU usage?",
"documents":[
{"id":"good","text":"Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference."},
{"id":"bad","text":"This note is about making sourdough starter."}
],
"top_k":2
}' | jq
```
Compatibility alias:
```bash
curl -sS http://127.0.0.1:18818/v1/rerank \
-H 'Content-Type: application/json' \
-d '{"model":"local-reranker","query":"npu busy time","documents":["OpenVINO NPU busy time proves accelerator use."],"top_n":1}' | jq
```
## Smoke test
```bash
source /home/will/.venvs/openvino-reranker/bin/activate
python /home/will/lab/swarm/openvino-reranker-npu/smoke.py --url http://127.0.0.1:18818
```
Expected:
- `/readyz` is HTTP 200 and reports `device=NPU`.
- Each fixture returns `ok=true` and a sorted `results` list.
- The top result matches the non-private fixture expectation.
- Response and sysfs `npu_busy_delta_us` are positive.
## Optional systemd user service
Install the unit only after the foreground command and smoke test pass:
```bash
cp /home/will/lab/swarm/openvino-reranker-npu/openvino-reranker.service /home/will/.config/systemd/user/openvino-reranker.service
systemctl --user daemon-reload
systemctl --user start openvino-reranker.service
systemctl --user status openvino-reranker.service --no-pager
journalctl --user -u openvino-reranker.service -n 100 --no-pager
```
Do not enable or integrate it into live RAG without explicit approval.
## Optional RAG integration plan (disabled by default)
RAG should keep vector search against `obsidian_bge_npu` unchanged, retrieve a larger candidate set, and call this service as a read-only request-time second stage. Suggested disabled-by-default knobs:
```text
RAG_RERANK_ENABLED=false
RAG_RERANK_URL=http://127.0.0.1:18818/rerank
RAG_RERANK_INITIAL_K=20
RAG_RERANK_TOP_K=5
RAG_RERANK_TIMEOUT_MS=3000
```
On reranker timeout/error, fall back to vector order and include metadata such as `rerank_error`; do not mutate or reindex Chroma collections.
@@ -0,0 +1,19 @@
[Unit]
Description=OpenVINO NPU Reranker HTTP Service (port 18818)
After=network-online.target
[Service]
Type=simple
WorkingDirectory=/home/will/lab/swarm/openvino-reranker-npu
Environment=OPENVINO_RERANKER_HOST=127.0.0.1
Environment=OPENVINO_RERANKER_PORT=18818
Environment=OPENVINO_RERANKER_MODEL=cross-encoder/ms-marco-MiniLM-L6-v2
Environment=OPENVINO_RERANKER_MODEL_DIR=/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov
Environment=OPENVINO_RERANKER_DEVICE=NPU
Environment=OPENVINO_RERANKER_MAX_LENGTH=512
ExecStart=/home/will/.venvs/openvino-reranker/bin/python /home/will/lab/swarm/openvino-reranker-npu/server.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=default.target
+369
View File
@@ -0,0 +1,369 @@
#!/usr/bin/env python3
"""OpenVINO NPU cross-encoder reranker HTTP service.
Default port: 18818
Default model: cross-encoder/ms-marco-MiniLM-L6-v2 exported as OpenVINO IR
Default device: NPU
Endpoints:
GET /, /healthz, /readyz
POST /rerank
POST /v1/rerank
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
import numpy as np
import openvino as ov
from transformers import AutoTokenizer
DEFAULT_MODEL_ID = "cross-encoder/ms-marco-MiniLM-L6-v2"
DEFAULT_MODEL_DIR = Path("/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov")
DEFAULT_PORT = 18818
DEFAULT_MAX_LENGTH = 512
DEFAULT_MAX_DOCUMENTS = 100
DEFAULT_MAX_BODY_BYTES = 5 * 1024 * 1024
NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
def npu_busy_time_us() -> int | None:
try:
return int(NPU_BUSY_FILE.read_text().strip())
except Exception:
return None
def sigmoid(x: float) -> float:
if x >= 0:
z = math.exp(-x)
return 1.0 / (1.0 + z)
z = math.exp(x)
return z / (1.0 + z)
def softmax_prob(logits: np.ndarray, index: int = 1) -> float:
row = np.asarray(logits, dtype=np.float64).reshape(-1)
shifted = row - np.max(row)
probs = np.exp(shifted) / np.sum(np.exp(shifted))
return float(probs[index])
class RerankerService:
def __init__(
self,
model_dir: Path,
model_id: str,
device: str,
max_length: int,
startup_smoke: bool = True,
) -> None:
self.model_dir = model_dir
self.model_id = model_id
self.device = device
self.max_length = int(max_length)
self.loaded_at = time.time()
self.lock = threading.Lock()
self.last_inference: dict[str, Any] | None = None
self.startup_smoke: dict[str, Any] | None = None
self.ready = False
self.ready_error: str | None = None
if not self.model_dir.exists():
raise FileNotFoundError(f"model directory not found: {self.model_dir}")
self.core = ov.Core()
self.available_devices = list(self.core.available_devices)
if self.device not in self.available_devices:
raise RuntimeError(f"OpenVINO device {self.device!r} unavailable; available={self.available_devices}")
xml_path = self.model_dir / "openvino_model.xml"
if not xml_path.exists():
raise FileNotFoundError(f"OpenVINO IR not found: {xml_path}")
self.tokenizer = AutoTokenizer.from_pretrained(str(self.model_dir), local_files_only=True)
model = self.core.read_model(str(xml_path))
self._reshape_static(model)
self.compiled = self.core.compile_model(model, self.device)
self.input_names = {inp.get_any_name() for inp in self.compiled.inputs}
self.output = self.compiled.output(0)
if startup_smoke:
try:
smoke = self.rerank(
"npu busy time",
[{"id": "smoke", "text": "OpenVINO NPU usage is verified by npu_busy_time_us."}],
top_k=1,
return_documents=False,
)
self.startup_smoke = {
"ok": bool(smoke.get("ok")),
"duration_ms": smoke.get("duration_ms"),
"npu_busy_delta_us": smoke.get("npu_busy_delta_us"),
}
if self.device == "NPU" and int(smoke.get("npu_busy_delta_us") or 0) <= 0:
raise RuntimeError("startup smoke did not increase npu_busy_time_us")
except Exception as exc:
self.ready_error = f"startup smoke failed: {type(exc).__name__}: {exc}"
raise
self.ready = True
def _reshape_static(self, model: ov.Model) -> None:
shape_by_name: dict[str, list[int]] = {}
for inp in model.inputs:
name = inp.get_any_name()
if name in {"input_ids", "attention_mask", "token_type_ids"}:
shape_by_name[name] = [1, self.max_length]
if shape_by_name:
model.reshape(shape_by_name)
def _tokenize(self, query: str, document: str) -> dict[str, np.ndarray]:
tokens = self.tokenizer(
query,
document,
max_length=self.max_length,
padding="max_length",
truncation=True,
return_tensors="np",
)
return {name: np.asarray(value) for name, value in tokens.items() if name in self.input_names}
def _score_pair(self, query: str, document: str) -> dict[str, float | None]:
inputs = self._tokenize(query, document)
missing = self.input_names - set(inputs)
# Some exported BERT models do not use token_type_ids. input_ids and attention_mask are required.
required_missing = missing & {"input_ids", "attention_mask"}
if required_missing:
raise RuntimeError(f"tokenizer did not produce required inputs: {sorted(required_missing)}")
outputs = self.compiled(inputs)
logits = np.asarray(outputs[self.output])
flat = logits.reshape(-1)
if flat.size == 1:
raw = float(flat[0])
return {"score": raw, "raw_score": raw, "probability": sigmoid(raw)}
if flat.size >= 2:
raw = float(flat[1])
return {"score": raw, "raw_score": raw, "probability": softmax_prob(flat, 1)}
raise RuntimeError(f"unexpected empty logits shape: {list(logits.shape)}")
def rerank(
self,
query: str,
documents: list[dict[str, Any]],
*,
top_k: int | None,
return_documents: bool = True,
) -> dict[str, Any]:
before = npu_busy_time_us()
started = time.perf_counter()
results: list[dict[str, Any]] = []
with self.lock:
for idx, doc in enumerate(documents):
scored = self._score_pair(query, str(doc["text"]))
item: dict[str, Any] = {
"index": idx,
"score": scored["score"],
"raw_score": scored["raw_score"],
"probability": scored["probability"],
}
if doc.get("id") is not None:
item["id"] = doc.get("id")
if return_documents:
item["text"] = doc["text"]
item["metadata"] = doc.get("metadata") if isinstance(doc.get("metadata"), dict) else {}
results.append(item)
after = npu_busy_time_us()
results.sort(key=lambda item: (-float(item["score"]), int(item["index"])))
clamped_top_k = len(results) if top_k is None else max(1, min(int(top_k), len(results)))
duration_ms = round((time.perf_counter() - started) * 1000, 3)
npu_delta = None if before is None or after is None else after - before
payload = {
"ok": True,
"model": self.model_id,
"model_dir": str(self.model_dir),
"device": self.device,
"query": query,
"input_count": len(documents),
"top_k": clamped_top_k,
"duration_ms": duration_ms,
"npu_busy_delta_us": npu_delta,
"results": results[:clamped_top_k],
}
self.last_inference = {
"duration_ms": duration_ms,
"docs": len(documents),
"npu_busy_delta_us": npu_delta,
}
return payload
def health(self) -> dict[str, Any]:
status = "ok" if self.ready else "degraded"
return {
"status": status,
"ok": self.ready,
"service": "openvino-reranker",
"model": self.model_id,
"model_dir": str(self.model_dir),
"device": self.device,
"available_devices": self.available_devices,
"max_length": self.max_length,
"input_names": sorted(self.input_names),
"uptime_s": round(time.time() - self.loaded_at, 3),
"npu_busy_time_us": npu_busy_time_us(),
"startup_smoke": self.startup_smoke,
"last_inference": self.last_inference,
"ready_error": self.ready_error,
}
def normalize_documents(value: Any, max_documents: int) -> list[dict[str, Any]]:
if not isinstance(value, list) or not value:
raise ValueError("documents must be a non-empty list")
if len(value) > max_documents:
raise ValueError(f"documents exceeds max_documents={max_documents}")
docs: list[dict[str, Any]] = []
for idx, item in enumerate(value):
if isinstance(item, str):
text = item
doc: dict[str, Any] = {"text": text}
elif isinstance(item, dict):
text = item.get("text")
doc = {
"id": item.get("id"),
"text": text,
"metadata": item.get("metadata") if isinstance(item.get("metadata"), dict) else {},
}
else:
raise ValueError(f"documents[{idx}] must be a string or object")
if not isinstance(text, str) or not text.strip():
raise ValueError(f"documents[{idx}].text must be a non-empty string")
docs.append(doc)
return docs
class Handler(BaseHTTPRequestHandler):
server_version = "OpenVINOReranker/0.1"
@property
def svc(self) -> RerankerService:
return self.server.reranker_service # type: ignore[attr-defined]
@property
def max_body_bytes(self) -> int:
return self.server.max_body_bytes # type: ignore[attr-defined]
@property
def max_documents(self) -> int:
return self.server.max_documents # type: ignore[attr-defined]
def do_GET(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
if path == "/":
self.write_json({"ok": True, "service": "openvino-reranker", "endpoints": ["/healthz", "/readyz", "/rerank", "/v1/rerank"]})
elif path in {"/healthz", "/health"}:
self.write_json(self.svc.health(), status=200)
elif path == "/readyz":
health = self.svc.health()
self.write_json(health, status=200 if health.get("ok") else 503)
else:
self.write_json({"ok": False, "error": "not found", "results": []}, status=404)
def do_POST(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
try:
if path not in {"/rerank", "/v1/rerank"}:
self.write_json({"ok": False, "error": "not found", "results": []}, status=404)
return
if not self.svc.ready:
self.write_json({"ok": False, "error": self.svc.ready_error or "model not ready", "results": []}, status=503)
return
payload = self.read_json()
query = payload.get("query")
if not isinstance(query, str) or not query.strip():
raise ValueError("query is required")
top_k = payload.get("top_k", payload.get("top_n"))
documents = normalize_documents(payload.get("documents"), self.max_documents)
return_documents = bool(payload.get("return_documents", True))
response = self.svc.rerank(query.strip(), documents, top_k=top_k, return_documents=return_documents)
self.write_json(response)
except RequestTooLarge as exc:
self.write_json({"ok": False, "error": str(exc), "results": []}, status=413)
except ValueError as exc:
self.write_json({"ok": False, "error": str(exc), "results": []}, status=400)
except Exception as exc:
self.write_json({"ok": False, "error": f"{type(exc).__name__}: {exc}", "results": []}, status=500)
def read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length") or 0)
if length > self.max_body_bytes:
raise RequestTooLarge(f"request body exceeds {self.max_body_bytes} bytes")
body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}"
payload = json.loads(body or "{}")
if not isinstance(payload, dict):
raise ValueError("JSON body must be an object")
return payload
def write_json(self, payload: dict[str, Any], status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name
print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True)
class RequestTooLarge(ValueError):
pass
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--host", default=os.environ.get("OPENVINO_RERANKER_HOST", "127.0.0.1"))
parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_RERANKER_PORT", DEFAULT_PORT)))
parser.add_argument("--model-dir", default=os.environ.get("OPENVINO_RERANKER_MODEL_DIR", str(DEFAULT_MODEL_DIR)))
parser.add_argument("--model", default=os.environ.get("OPENVINO_RERANKER_MODEL", DEFAULT_MODEL_ID))
parser.add_argument("--device", default=os.environ.get("OPENVINO_RERANKER_DEVICE", "NPU"))
parser.add_argument("--max-length", type=int, default=int(os.environ.get("OPENVINO_RERANKER_MAX_LENGTH", str(DEFAULT_MAX_LENGTH))))
parser.add_argument("--max-documents", type=int, default=int(os.environ.get("OPENVINO_RERANKER_MAX_DOCUMENTS", str(DEFAULT_MAX_DOCUMENTS))))
parser.add_argument("--max-body-bytes", type=int, default=int(os.environ.get("OPENVINO_RERANKER_MAX_BODY_BYTES", str(DEFAULT_MAX_BODY_BYTES))))
parser.add_argument("--skip-startup-smoke", action="store_true", default=os.environ.get("OPENVINO_RERANKER_SKIP_STARTUP_SMOKE", "").lower() in {"1", "true", "yes"})
args = parser.parse_args()
service = RerankerService(
Path(args.model_dir).expanduser(),
args.model,
args.device,
args.max_length,
startup_smoke=not args.skip_startup_smoke,
)
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.reranker_service = service # type: ignore[attr-defined]
httpd.max_body_bytes = args.max_body_bytes # type: ignore[attr-defined]
httpd.max_documents = args.max_documents # type: ignore[attr-defined]
print(
f"openvino-reranker listening on {args.host}:{args.port} model={args.model} "
f"model_dir={args.model_dir} device={args.device} max_length={args.max_length}",
flush=True,
)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())
+167
View File
@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""Smoke/benchmark checks for the OpenVINO reranker service.
Prints a JSON summary and exits non-zero on schema/ranking/NPU verification failure.
Uses only non-private fixture text.
"""
from __future__ import annotations
import argparse
import json
import statistics
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
FIXTURES = [
{
"query": "how do I verify OpenVINO NPU usage?",
"documents": [
{"id": "good", "text": "Check /sys/class/accel/accel0/device/npu_busy_time_us before and after inference."},
{"id": "bad", "text": "This note is about making sourdough starter."},
],
"expected_top_id": "good",
},
{
"query": "what port does the reranker service use?",
"documents": [
{"id": "unrelated", "text": "Whisper transcription accepts audio uploads."},
{"id": "port", "text": "The OpenVINO reranker prototype listens locally on port 18818."},
],
"expected_top_id": "port",
},
{
"query": "why should reranking not mutate vector collections?",
"documents": [
{"id": "mutation", "text": "Reranking is a read-only second-stage transformation after vector search."},
{"id": "cooking", "text": "Boil pasta in salted water until al dente."},
],
"expected_top_id": "mutation",
},
]
def npu_busy_time_us() -> int | None:
try:
return int(NPU_BUSY_FILE.read_text().strip())
except Exception:
return None
def post_json(url: str, payload: dict[str, Any], timeout: float) -> tuple[int, dict[str, Any]]:
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read().decode("utf-8", "replace")
return resp.status, json.loads(body)
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", "replace")
try:
parsed = json.loads(body)
except Exception:
parsed = {"error": body}
return exc.code, parsed
def get_json(url: str, timeout: float) -> tuple[int, dict[str, Any]]:
try:
with urllib.request.urlopen(url, timeout=timeout) as resp:
body = resp.read().decode("utf-8", "replace")
return resp.status, json.loads(body)
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", "replace")
try:
parsed = json.loads(body)
except Exception:
parsed = {"error": body}
return exc.code, parsed
def percentile(values: list[float], pct: float) -> float | None:
if not values:
return None
ordered = sorted(values)
idx = min(len(ordered) - 1, max(0, round((pct / 100.0) * (len(ordered) - 1))))
return round(ordered[idx], 3)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--url", default="http://127.0.0.1:18818")
parser.add_argument("--timeout", type=float, default=20.0)
parser.add_argument("--allow-cpu", action="store_true", help="do not fail when health reports a non-NPU device")
args = parser.parse_args()
base = args.url.rstrip("/")
failures: list[str] = []
health_status, health = get_json(f"{base}/readyz", args.timeout)
if health_status != 200 or not health.get("ok"):
failures.append(f"readyz failed status={health_status} error={health.get('ready_error') or health.get('error')}")
device = health.get("device")
if device != "NPU" and not args.allow_cpu:
failures.append(f"device is {device!r}, expected 'NPU'")
latencies: list[float] = []
response_npu_total = 0
sysfs_npu_total = 0
top1_passed = 0
for case in FIXTURES:
before = npu_busy_time_us()
started = time.perf_counter()
status, payload = post_json(
f"{base}/rerank",
{"query": case["query"], "documents": case["documents"], "top_k": len(case["documents"]), "return_documents": False},
args.timeout,
)
wall_ms = (time.perf_counter() - started) * 1000
after = npu_busy_time_us()
latencies.append(float(payload.get("duration_ms") or wall_ms))
response_delta = payload.get("npu_busy_delta_us")
sysfs_delta = None if before is None or after is None else after - before
if isinstance(response_delta, int):
response_npu_total += response_delta
if isinstance(sysfs_delta, int):
sysfs_npu_total += sysfs_delta
results = payload.get("results") if isinstance(payload, dict) else None
top_id = results[0].get("id") if isinstance(results, list) and results else None
if status != 200 or not payload.get("ok"):
failures.append(f"case {case['expected_top_id']} HTTP/status failed: status={status} error={payload.get('error')}")
if not isinstance(results, list) or len(results) != len(case["documents"]):
failures.append(f"case {case['expected_top_id']} returned invalid results")
if top_id == case["expected_top_id"]:
top1_passed += 1
else:
failures.append(f"case {case['expected_top_id']} top_id={top_id!r}")
if device == "NPU":
if not isinstance(response_delta, int) or response_delta <= 0:
failures.append(f"case {case['expected_top_id']} response npu delta not positive: {response_delta}")
if not isinstance(sysfs_delta, int) or sysfs_delta <= 0:
failures.append(f"case {case['expected_top_id']} sysfs npu delta not positive: {sysfs_delta}")
summary = {
"ok": not failures,
"url": base,
"model": health.get("model"),
"device": device,
"cases": len(FIXTURES),
"top1_passed": top1_passed,
"p50_ms": percentile(latencies, 50),
"p95_ms": percentile(latencies, 95),
"mean_ms": round(statistics.mean(latencies), 3) if latencies else None,
"npu_busy_delta_us_total": sysfs_npu_total,
"response_npu_busy_delta_us_total": response_npu_total,
"failures": failures,
}
print(json.dumps(summary, indent=2, sort_keys=True))
return 0 if not failures else 1
if __name__ == "__main__":
raise SystemExit(main())
+110
View File
@@ -0,0 +1,110 @@
#!/usr/bin/env bash
set -euo pipefail
# Read-only health probe for Will's local OpenVINO/NPU services.
# This script intentionally does not start, stop, restart, enable, reindex, or route anything.
BUSY_PATH=${BUSY_PATH:-/sys/class/accel/accel0/device/npu_busy_time_us}
CURL_TIMEOUT=${CURL_TIMEOUT:-8}
EMBED_MODEL=${EMBED_MODEL:-bge-base-en-v1.5-int8-ov}
EMBED_URL=${EMBED_URL:-http://127.0.0.1:18817/v1/embeddings}
have() { command -v "$1" >/dev/null 2>&1; }
json_pretty() {
if have jq; then
jq .
else
python -m json.tool
fi
}
section() {
printf '\n== %s ==\n' "$1"
}
http_json() {
local name=$1 url=$2
printf '\n[%s] %s\n' "$name" "$url"
if ! curl -fsS --max-time "$CURL_TIMEOUT" "$url" | json_pretty; then
printf 'status=unavailable_or_non_json\n'
return 1
fi
}
busy_value() {
if [[ -r "$BUSY_PATH" ]]; then
tr -d '\n' < "$BUSY_PATH"
else
printf 'missing'
fi
}
section "NPU counter"
printf 'busy_path=%s\n' "$BUSY_PATH"
printf 'busy_time_us=%s\n' "$(busy_value)"
section "Listeners"
ss -ltnp | grep -E ':(18810|18814|18816|18817|18818|18819|18820|18828|18829)\b' || true
section "User service states"
for unit in \
openvino-embeddings.service \
rag-embedding-health.service \
openvino-reranker.service \
openvino-router-classifier.service \
openvino-genai-npu-worker.service; do
active=$(systemctl --user is-active "$unit" 2>/dev/null || true)
enabled=$(systemctl --user is-enabled "$unit" 2>/dev/null || true)
printf '%-38s active=%-10s enabled=%s\n' "$unit" "${active:-unknown}" "${enabled:-unknown}"
done
section "Docker service states"
if [[ -d /home/will/lab/swarm ]]; then
(cd /home/will/lab/swarm && docker compose ps whisper-server-npu 2>/dev/null) || true
fi
section "HTTP health"
http_json "RAG endpoint" "http://127.0.0.1:18810/healthz" || true
http_json "RAG/embedding health wrapper" "http://127.0.0.1:18814/healthz" || true
http_json "Whisper NPU" "http://127.0.0.1:18816/health" || true
http_json "OpenVINO embeddings" "http://127.0.0.1:18817/healthz" || true
# Prototypes are expected to be unavailable until explicitly started/approved.
http_json "NPU reranker prototype" "http://127.0.0.1:18818/readyz" || true
http_json "NPU router classifier prototype" "http://127.0.0.1:18819/healthz" || true
http_json "NPU GenAI worker prototype" "http://127.0.0.1:18820/healthz" || true
section "Embeddings NPU busy-time proof"
if [[ ! -r "$BUSY_PATH" ]]; then
printf 'result=failed reason=missing_busy_counter\n'
exit 2
fi
before=$(busy_value)
response=$(curl -fsS --max-time "$CURL_TIMEOUT" \
"$EMBED_URL" \
-H 'Content-Type: application/json' \
-d "{\"input\":\"non-private npu health probe\",\"model\":\"$EMBED_MODEL\"}" || true)
after=$(busy_value)
if [[ -z "$response" ]]; then
printf 'result=failed reason=embedding_request_failed before_us=%s after_us=%s\n' "$before" "$after"
exit 3
fi
delta=$((after - before))
printf 'sysfs_before_us=%s\nsysfs_after_us=%s\nsysfs_delta_us=%s\n' "$before" "$after" "$delta"
RESPONSE_JSON="$response" python - <<'PY' || true
import json, os
try:
data = json.loads(os.environ.get('RESPONSE_JSON', ''))
except Exception as exc:
print(f'response_parse_error={type(exc).__name__}: {exc}')
raise SystemExit(0)
print(f"response_object={data.get('object')}")
print(f"response_model={data.get('model')}")
print(f"response_npu_busy_delta_us={data.get('npu_busy_delta_us')}")
print(f"embedding_count={len(data.get('data', []))}")
PY
if (( delta <= 0 )); then
printf 'result=failed reason=no_positive_sysfs_npu_delta\n'
exit 4
fi
printf 'result=ok\n'
@@ -1,7 +1,7 @@
--- ---
type: service-catalog type: service-catalog
created: 2026-05-14T14:50:46-07:00 created: 2026-05-14T14:50:46-07:00
updated: 2026-06-03T21:31:01-07:00 updated: 2026-06-04T11:35:00-07:00
tags: tags:
- service-catalog - service-catalog
- swarm - swarm
@@ -54,7 +54,12 @@ Canonical index of local services, automation tools, Hermes capabilities, and wh
| URL extractor | 18812 | OK 200 | URL/PDF/YouTube content extractor | `http://127.0.0.1:18812/healthz` | | URL extractor | 18812 | OK 200 | URL/PDF/YouTube content extractor | `http://127.0.0.1:18812/healthz` |
| Voice memo processor | 18813 | OK 200 | Voice memo processor | `http://127.0.0.1:18813/healthz` | | Voice memo processor | 18813 | OK 200 | Voice memo processor | `http://127.0.0.1:18813/healthz` |
| RAG/embedding health | 18814 | OK 200 | RAG/OpenVINO/Obsidian health wrapper | `http://127.0.0.1:18814/healthz` | | RAG/embedding health | 18814 | OK 200 | RAG/OpenVINO/Obsidian health wrapper | `http://127.0.0.1:18814/healthz` |
| Whisper OpenVINO NPU | 18816 | OK 200 / Docker healthy on 2026-06-04 | Intel NPU Whisper transcription service | `http://127.0.0.1:18816/health` |
| OpenVINO embeddings | 18817 | OK 200 | Intel NPU embeddings service for live Obsidian RAG | `http://127.0.0.1:18817/health` | | OpenVINO embeddings | 18817 | OK 200 | Intel NPU embeddings service for live Obsidian RAG | `http://127.0.0.1:18817/health` |
| OpenVINO NPU reranker prototype | 18818 | approved prototype; not enabled live | Optional second-stage RAG reranker | `http://127.0.0.1:18818/readyz` |
| OpenVINO router/classifier prototype | 18819 | approved prototype; not enabled live | Dry-run Atlas/Hermes message classifier/router | `http://127.0.0.1:18819/healthz` |
| OpenVINO GenAI NPU worker prototype | 18820 | approved prototype; not enabled live | Bounded local background generation worker | `http://127.0.0.1:18820/healthz` |
| OpenVINO document/image triage prototype | 18828/18829 | approved foreground prototype; not enabled live | Local document/image triage with NPU embeddings stage via `:18817` | `http://127.0.0.1:<port>/healthz` |
| Obsidian REST HTTP | 27123 | OK 200 | Obsidian Local REST API HTTP | `http://127.0.0.1:27123/` | | Obsidian REST HTTP | 27123 | OK 200 | Obsidian Local REST API HTTP | `http://127.0.0.1:27123/` |
## Docker services ## Docker services
@@ -77,6 +82,7 @@ make status
make local-ai-health make local-ai-health
make api-health make api-health
make timers make timers
./scripts/npu-service-health.sh
``` ```
## Host-side systemd/user services ## Host-side systemd/user services
@@ -93,6 +99,9 @@ Important known services:
| `voice-memo-processor.service` | Voice memo processing on 18813 | | `voice-memo-processor.service` | Voice memo processing on 18813 |
| `rag-embedding-health.service` | RAG/OpenVINO/Obsidian health check wrapper on 18814 | | `rag-embedding-health.service` | RAG/OpenVINO/Obsidian health check wrapper on 18814 |
| `openvino-embeddings.service` | Intel NPU BGE embedding service on 18817 | | `openvino-embeddings.service` | Intel NPU BGE embedding service on 18817 |
| `openvino-reranker.service` | Optional NPU reranker prototype on 18818; not installed/enabled without approval |
| `openvino-router-classifier.service` | Optional dry-run router/classifier prototype on 18819; not installed/enabled without approval |
| `openvino-genai-npu-worker.service` | Optional bounded GenAI worker prototype on 18820; not installed/enabled without approval |
Useful checks: Useful checks:
@@ -275,6 +284,7 @@ Profile Model Gateway Alias Distribu
| Hermes CLI/toolsets/gateway/profiles | Hermes skill `hermes-agent`; `hermes --help`; `hermes tools list` | | Hermes CLI/toolsets/gateway/profiles | Hermes skill `hermes-agent`; `hermes --help`; `hermes tools list` |
| Obsidian automation workflows | `~/lab/swarm/swarm-common/n8n-workflows/obsidian-*.json` | | Obsidian automation workflows | `~/lab/swarm/swarm-common/n8n-workflows/obsidian-*.json` |
| Runbooks | [[Runbooks Home]] | | Runbooks | [[Runbooks Home]] |
| OpenVINO NPU service operations | [[OpenVINO NPU Services Runbook]]; `~/lab/swarm/scripts/npu-service-health.sh` |
## Safety notes ## Safety notes
@@ -0,0 +1,268 @@
---
type: runbook
system: openvino-npu-services
status: draft
created: 2026-06-04
updated: 2026-06-04
tags:
- runbook
- openvino
- npu
- swarm
- atlas
related:
- [[Service Catalog]]
- [[Swarm Operating Manual]]
- [[Atlas Capability Upgrade Program]]
---
# OpenVINO NPU Services Runbook
This runbook is the integrated operations view for Will's local Intel NPU/OpenVINO services from the `npu-capability-expansion` board.
Safety posture:
- Do not restart the live Atlas/Hermes gateway from this runbook.
- Do not change primary Atlas/Hermes routing without explicit Will approval.
- Do not delete, overwrite, or in-place reindex existing Chroma/vector collections.
- Treat HTTP 200 as necessary but not sufficient for NPU-backed services; verify `/sys/class/accel/accel0/device/npu_busy_time_us` before/after an inference.
- Keep endpoints local-only unless Will explicitly approves broader exposure.
- Keep raw prompts, private documents, OCR text, and secrets out of logs and durable handoffs.
## Current service map
| Capability | Port | Runtime / service | Path | State | Health endpoint | NPU proof |
| --- | ---: | --- | --- | --- | --- | --- |
| Obsidian/RAG endpoint | 18810 | `obsidian-reindex-endpoint.service` / local Python endpoint | `~/lab/swarm/scripts/` | live baseline; uses collection `obsidian_bge_npu` | `http://127.0.0.1:18810/healthz` | indirect via embeddings `:18817`; do not mutate existing collection |
| RAG/embedding health wrapper | 18814 | `rag-embedding-health.service` | `~/lab/swarm/swarm-common/rag-embedding-health.service` | live baseline | `http://127.0.0.1:18814/healthz` | should exercise embeddings path when configured |
| Whisper transcription, OpenVINO NPU | 18816 | Docker Compose service/container `whisper-server-npu` | `~/lab/swarm/whisper-openvino-npu/` | live baseline | `http://127.0.0.1:18816/health` | transcription response includes `npu_busy_delta_us`; sysfs delta must increase |
| OpenVINO embeddings | 18817 | user systemd `openvino-embeddings.service` | `~/lab/swarm/scripts/openvino-embeddings-server.py`; unit in `~/lab/swarm/swarm-common/openvino-embeddings.service` | live baseline, enabled | `http://127.0.0.1:18817/health` | embedding response and sysfs delta must be positive |
| NPU reranker prototype | 18818 | optional user systemd `openvino-reranker.service` | `~/lab/swarm/openvino-reranker-npu/` | approved prototype; not installed/enabled | `http://127.0.0.1:18818/readyz` | `/readyz` reports `device=NPU`; `/v1/rerank` response and sysfs delta must be positive |
| NPU router/classifier prototype | 18819 | optional user systemd `openvino-router-classifier.service` | `~/lab/swarm/openvino-classifier-npu/` | approved prototype; not installed/enabled | `http://127.0.0.1:18819/healthz` | `/v1/classify` response has positive `npu_busy_delta_us` and `sysfs_npu_busy_delta_us` |
| Small OpenVINO GenAI NPU worker | 18820 | optional user systemd `openvino-genai-npu-worker.service` | `~/lab/swarm/openvino-genai-npu-worker/` | approved prototype; not installed/enabled | `http://127.0.0.1:18820/healthz`; `GET /models` | generation response includes positive `npu_busy_delta_us` |
| Document/image triage prototype | 18828 or 18829 for review only | foreground local-only server; no persistent unit yet | `~/lab/swarm/openvino-doc-image-triage-npu/` | approved prototype; not installed/enabled | `http://127.0.0.1:<port>/healthz`; `GET /models` | v1 NPU stage is semantic embedding through `:18817`; image classification/OCR remain CPU/local |
Port notes:
- `18818`, `18819`, and `18820` are reserved prototype ports from the program plan; check listeners before binding.
- `18820` was used by the GenAI worker prototype. The document/image triage prototype README still contains a `18820` example, but review used `18828`/`18829` to avoid collision. Prefer `18828`/`18829` for triage foreground review until Will approves a final persistent port.
- Existing `:18817` is currently bound on `0.0.0.0` by the user service; prototype services should still default to `127.0.0.1`.
## Read-only unified health check
From the swarm repo:
```bash
cd ~/lab/swarm
./scripts/npu-service-health.sh
```
The script is read-only. It checks listeners, user service state, Docker Compose state for `whisper-server-npu`, JSON health endpoints, and performs a non-private embeddings request while measuring `/sys/class/accel/accel0/device/npu_busy_time_us` before and after. A positive sysfs delta is required for the embeddings proof.
Manual minimal checks:
```bash
BUSY=/sys/class/accel/accel0/device/npu_busy_time_us
cat "$BUSY"
ss -ltnp | grep -E ':(18810|18814|18816|18817|18818|18819|18820|18828|18829)\b' || true
systemctl --user is-active openvino-embeddings.service rag-embedding-health.service
cd ~/lab/swarm && docker compose ps whisper-server-npu
curl -fsS http://127.0.0.1:18817/health | jq .
```
Embedding NPU proof:
```bash
BUSY=/sys/class/accel/accel0/device/npu_busy_time_us
before=$(cat "$BUSY")
curl -fsS http://127.0.0.1:18817/v1/embeddings \
-H 'Content-Type: application/json' \
-d '{"input":"non-private npu health probe","model":"bge-base-en-v1.5-int8-ov"}' | jq '{model, object, npu_busy_delta_us, embedding_count:(.data|length)}'
after=$(cat "$BUSY")
echo "sysfs_npu_busy_delta_us=$((after-before))"
```
A healthy NPU path has:
- HTTP success from the endpoint.
- Response-level `npu_busy_delta_us > 0` when the service reports it.
- Sysfs `after - before > 0`.
## Service-specific smoke checks
### Whisper NPU (`:18816`)
```bash
curl -fsS http://127.0.0.1:18816/health | jq .
# For a real transcription smoke, use a small non-private WAV fixture only.
# Verify both response npu_busy_delta_us and sysfs busy-time delta.
```
Operational notes:
- Managed as Docker Compose service/container `whisper-server-npu` in `~/lab/swarm`.
- Consistent with existing swarm service patterns because it is a containerized service with Compose health.
- Do not restart it from this runbook unless Will asked for remediation.
### OpenVINO embeddings (`:18817`)
```bash
systemctl --user status openvino-embeddings.service --no-pager
curl -fsS http://127.0.0.1:18817/health | jq .
```
Operational notes:
- User systemd unit: `openvino-embeddings.service`.
- Model: `bge-base-en-v1.5-int8-ov`.
- Model directory: `/home/will/.cache/openvino-models/bge-base-en-v1.5-int8-ov`.
- Live RAG `:18810` uses Chroma collection `obsidian_bge_npu` through this service. Do not reindex or replace this collection in place.
### Reranker prototype (`:18818`)
Foreground review start only, after confirming port is free:
```bash
ss -ltnp | grep ':18818\b' || true
cd ~/lab/swarm/openvino-reranker-npu
source /home/will/.venvs/openvino-reranker/bin/activate
OPENVINO_RERANKER_HOST=127.0.0.1 \
OPENVINO_RERANKER_PORT=18818 \
OPENVINO_RERANKER_DEVICE=NPU \
OPENVINO_RERANKER_MODEL_DIR=/home/will/.cache/openvino-models/rerankers/ms-marco-MiniLM-L6-v2-int8-ov \
python server.py
```
From another shell:
```bash
curl -fsS http://127.0.0.1:18818/readyz | jq .
python ~/lab/swarm/openvino-reranker-npu/smoke.py --url http://127.0.0.1:18818
```
Approval gate:
- May be installed as `openvino-reranker.service` only after foreground smoke and Will approval.
- May be integrated into RAG only behind disabled-by-default knobs such as `RAG_RERANK_ENABLED=false`; request-time reranking must not mutate Chroma.
### Router/classifier prototype (`:18819`)
Foreground review start only, after confirming port is free:
```bash
ss -ltnp | grep ':18819\b' || true
cd ~/lab/swarm/openvino-classifier-npu
/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819
```
Smoke:
```bash
curl -fsS http://127.0.0.1:18819/healthz | jq .
curl -fsS http://127.0.0.1:18819/v1/classify \
-H 'Content-Type: application/json' \
-d '{"id":"smoke","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true,"dry_run":true}}' | jq .
```
Approval gate:
- May be installed as `openvino-router-classifier.service` only after Will approves live service enablement.
- Must remain dry-run and must not alter Hermes/Atlas routing, memory writes, safety confirmation flow, or outbound messages without a separate explicit approval.
### Small GenAI NPU worker (`:18820`)
Foreground review start only, after confirming port is free:
```bash
ss -ltnp | grep ':18820\b' || true
cd ~/lab/swarm/openvino-genai-npu-worker
/home/will/.venvs/npu/bin/python worker.py --host 127.0.0.1 --port 18820
```
Smoke:
```bash
curl -fsS http://127.0.0.1:18820/healthz | jq .
curl -fsS http://127.0.0.1:18820/models | jq .
curl -fsS http://127.0.0.1:18820/v1/worker/condense-notification \
-H 'Content-Type: application/json' \
-d '{"input":"Non-private smoke notification for local NPU worker.","max_new_tokens":64}' | jq .
```
Approval gate:
- May be installed as `openvino-genai-npu-worker.service` only after Will approves persistent service enablement.
- Must not become primary Atlas/Hermes model routing. Use only for bounded background jobs such as title, summary, notification condensation, and memory-candidate drafting.
### Document/image triage prototype (`:18828`/`:18829` review ports)
Foreground review start only, after confirming port is free:
```bash
ss -ltnp | grep -E ':(18828|18829)\b' || true
cd ~/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python server.py --host 127.0.0.1 --port 18828 --allowed-root "$PWD"
```
Smoke:
```bash
curl -fsS http://127.0.0.1:18828/healthz | jq .
curl -fsS http://127.0.0.1:18828/models | jq .
/home/will/.venvs/npu/bin/python tests/smoke_test.py
```
Approval gate:
- Do not point it at arbitrary directories; allowed roots must be equal to or under configured roots.
- Do not include raw OCR text or full source paths unless Will explicitly asks for a one-off response.
- v1 only uses the NPU through `:18817` embeddings for needs-attention; image category classification and OCR are CPU/local fallbacks.
## Systemd and Compose recommendations
Recommended management split:
- Keep containerized services in Docker Compose when they already have Docker build/runtime shape and Compose health (`whisper-server-npu`).
- Keep host-side OpenVINO Python prototypes as user systemd services when they depend on local venvs, sysfs NPU access, model caches, and localhost-only APIs (`openvino-embeddings`, optional reranker/classifier/GenAI worker).
- Do not add the prototypes to the live gateway or primary routing during installation. Installation and routing are separate approval gates.
User-systemd unit expectations for optional prototypes:
- `WorkingDirectory` points at the service directory under `~/lab/swarm/`.
- `ExecStart` uses the existing venv path documented by the prototype.
- `Environment` pins host to `127.0.0.1`, port, model path, device `NPU`, and any upstream endpoint.
- `Restart=on-failure`, not aggressive restart loops.
- Logs go to user journal; do not log raw request bodies.
- Start manually for smoke; enable on boot only after Will approval.
Compose expectations for existing swarm services:
- Prefer `cd ~/lab/swarm && make ps`, `make status`, and targeted `docker compose ps <service>` for read-only checks.
- Do not run `docker compose up -d`, restart containers, pull images, or prune volumes from this runbook without approval.
## Monitoring and logging notes
Minimum recurring monitoring should include:
- Listener presence for `18816`, `18817`, and any approved optional prototype ports.
- User service state for `openvino-embeddings.service` and any approved optional prototype unit.
- Docker Compose health for `whisper-server-npu`.
- HTTP health endpoint success.
- Positive sysfs NPU busy-time delta on at least one non-private inference probe, preferably embeddings `:18817` because it is already live and central.
- Journal/container logs only at summary level. Avoid raw prompts, raw OCR text, private document names, credentials, and API keys.
Useful log commands:
```bash
journalctl --user -u openvino-embeddings.service -n 100 --no-pager
journalctl --user -u rag-embedding-health.service -n 100 --no-pager
journalctl --user -u openvino-reranker.service -n 100 --no-pager
journalctl --user -u openvino-router-classifier.service -n 100 --no-pager
journalctl --user -u openvino-genai-npu-worker.service -n 100 --no-pager
cd ~/lab/swarm && docker compose logs --tail 100 whisper-server-npu
```
## Approval gates
Requires explicit Will approval before proceeding:
- Installing, enabling, or autostarting `openvino-reranker.service`, `openvino-router-classifier.service`, or `openvino-genai-npu-worker.service`.
- Assigning a final persistent port to document/image triage or enabling it as a persistent service.
- Enabling live RAG reranking or any request path that changes Atlas/RAG answers.
- Changing primary Atlas/Hermes routing or connecting router/classifier outputs to live decisions.
- Connecting the GenAI worker to primary Atlas chat, gateway routing, memory writes, or outbound notifications.
- Restarting the live Atlas/Hermes gateway.
- Deleting, overwriting, or in-place reindexing existing vector collections.
- Broadening bind addresses or exposure beyond local-only defaults.
Approved/parked outcomes:
- Built/approved prototypes: reranker (`:18818`), router/classifier (`:18819`), small GenAI worker (`:18820`), document/image triage (review ports `:18828`/`:18829`).
- Live baseline retained: Whisper NPU (`:18816`), OpenVINO embeddings (`:18817`), RAG endpoint (`:18810`) using `obsidian_bge_npu`.
- Parked: always-on wake-word/audio and conventional vision detection until Will wants a concrete use case.
- Rejected for this NPU program: diffusion/image generation.