From 6b1cae016c140cef6608f60bd4339dcd92b21fee Mon Sep 17 00:00:00 2001 From: Atlas Date: Thu, 4 Jun 2026 12:06:08 -0700 Subject: [PATCH 1/6] docs: specify OpenVINO doc image triage prototype --- openvino-doc-image-triage-npu/README.md | 17 +-- openvino-doc-image-triage-npu/SPEC.md | 146 ++++++++++++++++++++++++ openvino-doc-image-triage-npu/server.py | 2 +- 3 files changed, 157 insertions(+), 8 deletions(-) create mode 100644 openvino-doc-image-triage-npu/SPEC.md diff --git a/openvino-doc-image-triage-npu/README.md b/openvino-doc-image-triage-npu/README.md index 56890db..f21d7be 100644 --- a/openvino-doc-image-triage-npu/README.md +++ b/openvino-doc-image-triage-npu/README.md @@ -1,7 +1,8 @@ # OpenVINO NPU document/image triage prototype -Local-only prototype for triaging screenshots, photos/scans, and PDF page images. +Local-only, CLI-first prototype for triaging screenshots, photos/scans, and PDF page images. It returns structured JSON metadata and explicitly reports CPU vs NPU stages. +Optional HTTP is a localhost-only prototype on `127.0.0.1:18829` when explicitly started; it is not a live Atlas/Hermes/RAG integration. Location: `/home/will/lab/swarm/openvino-doc-image-triage-npu/` @@ -13,6 +14,8 @@ Location: `/home/will/lab/swarm/openvino-doc-image-triage-npu/` - Full source paths are omitted by default; responses include basename and SHA-256. - Allowed roots are enforced for CLI/server requests. - This prototype does not mutate Obsidian, RAG, Chroma, vector collections, routing, or gateway services. +- Do not process broad private document/image directories; use generated synthetic fixtures unless Will explicitly approves a narrow source root. +- See `SPEC.md` for the full CLI contract, smoke-test plan, NPU verification plan, docs implications, and no-go/defer criteria. ## CPU vs NPU stages @@ -88,25 +91,25 @@ Include OCR/sidecar text in a single response only when explicitly requested: ## HTTP usage -Check that port 18820 is free first: +HTTP is optional and not enabled by default. Check that port 18829 is free first: ```bash -ss -ltnp | grep ':18820\b' || true +ss -ltnp | grep ':18829\b' || true ``` Start local-only server: ```bash cd /home/will/lab/swarm/openvino-doc-image-triage-npu -/home/will/.venvs/npu/bin/python server.py --host 127.0.0.1 --port 18820 --allowed-root "$PWD" +/home/will/.venvs/npu/bin/python server.py --host 127.0.0.1 --port 18829 --allowed-root "$PWD" ``` Call it: ```bash -curl -sS http://127.0.0.1:18820/healthz | jq -curl -sS http://127.0.0.1:18820/models | jq -curl -sS -X POST http://127.0.0.1:18820/triage \ +curl -sS http://127.0.0.1:18829/healthz | jq +curl -sS http://127.0.0.1:18829/models | jq +curl -sS -X POST http://127.0.0.1:18829/triage \ -H 'Content-Type: application/json' \ -d '{"path":"/home/will/lab/swarm/openvino-doc-image-triage-npu/samples/synthetic_invoice.png","options":{"allowed_roots":["/home/will/lab/swarm/openvino-doc-image-triage-npu"]}}' | jq ``` diff --git a/openvino-doc-image-triage-npu/SPEC.md b/openvino-doc-image-triage-npu/SPEC.md new file mode 100644 index 0000000..07885e0 --- /dev/null +++ b/openvino-doc-image-triage-npu/SPEC.md @@ -0,0 +1,146 @@ +# OpenVINO NPU document/image triage spec + +Status: CLI-first prototype specification; not a live Atlas/Hermes integration. + +## Safety stance + +- Default workflow is local CLI execution against explicitly named files. +- Optional HTTP is disabled unless a human starts it, binds to localhost, and is intended for `127.0.0.1:18829` only. +- No persistent systemd unit, Docker service, gateway hook, Atlas/Hermes route, RAG route, Chroma/vector collection mutation, or in-place reindexing is part of this spec. +- Smoke data must be synthetic/non-private only. Do not point this tool at Will's private document, image, screenshot, Downloads, Desktop, Obsidian, or photo-library directories without explicit approval. +- NPU claims require `/sys/class/accel/accel0/device/npu_busy_time_us` before/after deltas. HTTP 200, JSON output, or model-load success alone is not NPU proof. + +## Recommended model/runtime + +Recommended v1 runtime: + +- File intake, hashing, MIME/extension checks, image/PDF rendering, sidecar/native PDF text extraction, metadata extraction, and category fallback: local Python CPU path using Pillow plus optional `pypdf`/`pypdfium2`. +- Needs-attention semantic check: reuse the live localhost OpenVINO embeddings service on `127.0.0.1:18817`, currently `bge-base-en-v1.5-int8-ov`, and verify each embedding call with `npu_busy_time_us` deltas. +- Category classification in v1: CPU rule fallback, explicitly reported as not an NPU image model. + +Why this is the recommended v1: + +- It avoids private-data exposure: no external upload path and no broader local file scanning. +- It avoids collection/routing risk by using the existing embeddings API as a stateless feature extractor only; it does not write to RAG or Chroma. +- It gives a real NPU verification hook for the semantic stage without overclaiming that OCR/image classification are NPU-backed. +- It keeps the prototype useful even when optional PDF dependencies or the embeddings service are unavailable: it can fall back to CPU-only metadata/rule output and mark NPU verification false. + +Deferred model work: + +- NPU image category classifier: defer until a static-shape OpenVINO IR image model such as MobileNet/EfficientNet/ResNet is selected, calibrated for the label set, and smoke-tested with busy-time deltas. +- NPU OCR/VLM: defer; OCR remains local CPU text plumbing in v1. + +## CLI contract + +Command: + +```bash +cd /home/will/lab/swarm/openvino-doc-image-triage-npu +/home/will/.venvs/npu/bin/python triage.py \ + --allowed-root /home/will/lab/swarm/openvino-doc-image-triage-npu \ + --max-pages 3 \ + --pretty \ + samples/synthetic_invoice.png samples/synthetic_invoice.pdf +``` + +Inputs: + +- Positional `paths`: one or more local image/PDF paths. +- `--allowed-root ROOT`: may repeat; every requested path must resolve under one of these roots. Default is current directory. +- `--max-pages N`: maximum rendered/extracted PDF pages; default 3. +- `--no-embeddings`: disables the localhost `:18817` embedding/NPU check and reports CPU fallback/no text. +- `--dry-run`: skip image/PDF rendering while still checking intake/hash/text/metadata where available. +- `--include-ocr-text`: include raw extracted/sidecar text in this single response only; off by default. +- `--include-full-path`: include resolved full paths; off by default. +- `--pretty`: pretty-print JSON. + +Output: + +- Batch JSON: `{ "ok": bool, "files": [...], "generated_at": "..." }`. +- Per file result includes `file_id` as `sha256:`, `source_path_basename`, media type, file size, pages, classification, needs-attention result, metadata counts/flags, privacy flags, and processing-device summary. +- Raw OCR/text and full paths are omitted unless explicitly requested. +- NPU evidence is per embedding call: `used`, `verified_npu`, `npu_busy_delta_us`, endpoint, and wall time. + +Exit behavior: + +- Exit 0 when all files triage successfully. +- Exit 2 when one or more files fail policy/intake/processing checks. + +## Optional localhost HTTP contract + +HTTP is optional and not enabled by this spec. If explicitly started for a smoke or local demo, use localhost and port 18829: + +```bash +cd /home/will/lab/swarm/openvino-doc-image-triage-npu +ss -ltnp | grep ':18829\b' || true +/home/will/.venvs/npu/bin/python server.py --host 127.0.0.1 --port 18829 --allowed-root "$PWD" +``` + +Endpoints: + +- `GET /healthz` or `/health`: service name, bind policy, configured allowed roots, privacy flags, and current `npu_busy_time_us`. +- `GET /models`: reports v1 stages and whether each is CPU or NPU-backed. +- `POST /triage`: `{ "path": "/local/file", "options": {...} }` -> `{ "ok": true, "result": ... }`. +- `POST /triage/batch`: `{ "paths": ["/local/file"], "options": {...} }` -> batch JSON. + +HTTP privacy/policy rules: + +- Server startup `--allowed-root` is the outer allowlist. +- Request `options.allowed_roots` may narrow that allowlist but must not widen it. +- Request `options.embedding_url` may only target the configured local loopback embeddings route `http://127.0.0.1:18817/v1/embeddings` (or localhost equivalent); external or alternate endpoints are rejected. +- Request bodies and raw text are not logged by the stdlib handler. +- Stop the temporary server after the smoke/demo. + +## Synthetic smoke-test plan + +Use only generated fixtures under the prototype directory: + +```bash +cd /home/will/lab/swarm/openvino-doc-image-triage-npu +/home/will/.venvs/npu/bin/python make_samples.py +/home/will/.venvs/npu/bin/python tests/smoke_test.py +``` + +Expected smoke coverage: + +- Creates synthetic invoice/receipt/form-like image/PDF fixtures. +- Runs CLI triage against the synthetic invoice image/PDF under an explicit allowed root. +- Asserts privacy flags (`external_uploads: false`, no full path by default). +- Asserts invoice category/needs-attention behavior on synthetic text. +- Starts a temporary localhost HTTP server on an ephemeral smoke port, calls `/healthz` and `/triage`, verifies no full path leakage, rejects attempts to widen allowed roots, and rejects external embedding URLs. +- Terminates the temporary server. + +The smoke port in tests should stay ephemeral/non-live (currently `18828`) to avoid claiming `18829` as a persistent service. + +## NPU busy-time verification plan + +For every test that claims NPU use: + +1. Read `/sys/class/accel/accel0/device/npu_busy_time_us` before the operation. +2. Perform an operation that should call the live embeddings service on `127.0.0.1:18817` with non-empty synthetic text. +3. Read `npu_busy_time_us` after the operation. +4. Require both: + - the per-result embedding object reports `used: true`, `verified_npu: true`, and `npu_busy_delta_us > 0`; and + - the outer before/after sysfs value increased. +5. If sysfs is missing or `:18817` is unavailable, do not claim NPU success; report CPU fallback / embedding unavailable and keep the smoke result honest. + +## Docs and diagram implications + +- Service maps should list document/image triage as CLI-first and optional prototype `127.0.0.1:18829`, not live unless explicitly started. +- Diagrams must not draw live Atlas/Hermes/gateway/RAG routing to this triage lane. +- If shown with other candidate sidecars, label it separately from live services: live baseline remains RAG `:18810`, Whisper NPU `:18816`, and embeddings `:18817`; prototype sidecars are reranker `:18818`, classifier/router `:18819`, GenAI worker `:18820`, and optional doc/image triage `:18829`. +- Runbooks should include CLI smoke, localhost listener checks, busy-time delta verification, and server shutdown instructions. +- Documentation should state CPU vs NPU stages explicitly so the prototype does not imply NPU OCR or NPU image classification. + +## No-go / defer criteria + +Do not proceed to implementation, live integration, or persistent service enablement if any of these are true: + +- Will has not explicitly approved live routing or persistent service enablement. +- The requested source path is a private document/image directory or broad home-directory scan rather than synthetic fixtures or an explicitly approved narrow root. +- The workflow would mutate Obsidian, RAG, Chroma/vector collections, or reindex in place. +- The optional server would need to bind anywhere other than localhost. +- NPU busy-time does not increase for an operation being described as NPU-backed. +- Raw OCR text or full paths would be logged, uploaded, stored durably, or returned without explicit request. +- PDF/image dependencies are missing and the task requires rendered page analysis rather than metadata/text-only fallback. +- A future image classifier/OCR/VLM model has not been selected, converted/quantized to OpenVINO, calibrated for the task, and verified on synthetic fixtures with busy-time deltas. diff --git a/openvino-doc-image-triage-npu/server.py b/openvino-doc-image-triage-npu/server.py index e95b6a1..96e62b0 100644 --- a/openvino-doc-image-triage-npu/server.py +++ b/openvino-doc-image-triage-npu/server.py @@ -163,7 +163,7 @@ class Handler(BaseHTTPRequestHandler): def main() -> int: parser = argparse.ArgumentParser(description="Local-only doc/image triage HTTP server") parser.add_argument("--host", default=os.environ.get("DOC_IMAGE_TRIAGE_HOST", "127.0.0.1")) - parser.add_argument("--port", type=int, default=int(os.environ.get("DOC_IMAGE_TRIAGE_PORT", "18820"))) + parser.add_argument("--port", type=int, default=int(os.environ.get("DOC_IMAGE_TRIAGE_PORT", "18829"))) parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; may repeat") args = parser.parse_args() roots = [Path(p).expanduser().resolve() for p in args.allowed_root] or [Path.cwd().resolve()] From cb874f9743848208bed902d8ad36392f3187e6ef Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 12:06:28 -0700 Subject: [PATCH 2/6] docs: define OpenVINO GenAI NPU worker contract --- openvino-genai-npu-worker/CONTRACT.md | 299 ++++++++++++++++++++++++++ openvino-genai-npu-worker/README.md | 1 + 2 files changed, 300 insertions(+) create mode 100644 openvino-genai-npu-worker/CONTRACT.md diff --git a/openvino-genai-npu-worker/CONTRACT.md b/openvino-genai-npu-worker/CONTRACT.md new file mode 100644 index 0000000..babebbb --- /dev/null +++ b/openvino-genai-npu-worker/CONTRACT.md @@ -0,0 +1,299 @@ +# Bounded OpenVINO GenAI NPU worker contract + +Status: proposed prototype contract; not a live Atlas/Hermes routing dependency. +Default address: `http://127.0.0.1:18820`. + +## Purpose and hard boundary + +This worker is a local-only sidecar for small, bounded generation jobs that are useful around the assistant stack but are not primary chat: title drafting, short summaries, notification condensation, and memory-candidate extraction. It must not be used as Atlas/Hermes primary model routing, gateway fallback routing, autonomous tool-calling, or an unbounded chat endpoint without a separate approval gate. + +Hard boundaries: + +- Bind to `127.0.0.1` by default; non-local bind is a code/ops review item, not a runtime flag to casually change. +- Do not enable a persistent systemd/Docker service as part of smoke testing. +- Do not restart or reconfigure Atlas, Hermes, gateway, LiteLLM, RAG, or n8n routing to call this worker without explicit approval from Will. +- Do not write memory, mutate Chroma/vector collections, trigger RAG reindexing, or process private document/image directories. +- Do not log raw prompts or raw request bodies by default. +- Treat HTTP success as insufficient for NPU claims; require positive `/sys/class/accel/accel0/device/npu_busy_time_us` delta for generation. + +## Recommended model/runtime + +Recommended first model: + +- Model id: `OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov` +- Local path: `/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov` +- Runtime: `/home/will/.venvs/npu` with `openvino-genai==2026.2.0.0` +- Device: OpenVINO GenAI `NPU` +- Compile cache: `/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4` + +Why this model/runtime: + +- It is already staged in the repo prototype and has a local smoke observation with positive NPU busy-time delta. +- It is an OpenVINO IR model with INT4-compressed weights, which keeps memory/compile pressure low enough for a sidecar on the shared NPU. +- Qwen2.5-1.5B-Instruct is large enough for formatting/summarization/notification jobs but small enough to keep latency bounded. It should not be marketed as a high-quality general assistant model. +- The Hugging Face model card identifies it as Qwen2.5-1.5B-Instruct converted to OpenVINO IR with INT4_SYM NNCF weight compression and states compatibility with OpenVINO 2025.1.0+; the local runtime is newer than that baseline. +- OpenVINO GenAI `LLMPipeline` is the right first runtime because the existing local NPU stack already uses OpenVINO GenAI successfully for Whisper, and it exposes a simple bounded generate call with cache controls. + +Deferred alternatives: + +- Larger 3B/7B local LLMs: defer until the 1.5B contract proves stable; larger models increase compile time, memory pressure, and NPU contention. +- CPU/GPU fallback inside this service: defer; fallback would blur the NPU verification contract. If fallback is later approved, return `device_actual` and keep NPU-only health separate. +- Manual `EXPORT_BLOB`/`BLOB_PATH`: defer until compile latency is proven to dominate despite `CACHE_DIR`. If used later, record OpenVINO version, NPU compiler/driver versions, model id, quantization flags, and source model path; invalidate after OpenVINO/NPU driver upgrades. + +## Runtime bounds + +Pipeline configuration for the first milestone: + +```text +CACHE_DIR=/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4 +MAX_PROMPT_LEN=1024 +MIN_RESPONSE_LEN=64 +PREFILL_HINT=DYNAMIC +GENERATE_HINT=FAST_COMPILE +``` + +Request bounds: + +- `input`: required non-empty string; max `6000` characters before prompt templating. +- `job`: one of `title`, `summary`, `notification`, `memory_candidate`. +- `max_new_tokens`: optional; default by job; hard max `256`. +- Concurrency: generation must be serialized inside the process with a lock because the NPU is shared with Whisper/embeddings/prototype sidecars. +- Logging: log method/path/status and timing only; never log raw `input` or generated text by default. + +Expected latency target: + +- Cold-ish first generation with cache available: acceptable if roughly 15 seconds or less for a short prompt on the staged model. +- Warm short jobs: target under 5 seconds for `title`/`notification` and under 10 seconds for `summary`/`memory_candidate`. +- Defer promotion if p95 warm latency exceeds 15 seconds for 24-96 generated tokens, or if cold compile regularly blocks the NPU long enough to degrade live Whisper/embeddings. + +These are prototype acceptance targets, not SLOs for live Atlas routing. + +## CLI contract + +Command shape: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +/home/will/.venvs/npu/bin/python worker.py \ + --job title \ + --input 'Synthetic non-private text to title.' \ + --max-new-tokens 32 +``` + +CLI stdout is JSON with the same response shape as HTTP generation. Exit code must be: + +- `0` when the job succeeds and `npu_busy_delta_us > 0`. +- non-zero when input validation fails, model load/generation fails, or NPU busy-time delta is not positive. + +The CLI must not write memory, change service routing, or start persistent services. + +## HTTP contract + +Start temporary local server only: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +/home/will/.venvs/npu/bin/python worker.py --host 127.0.0.1 --port 18820 +``` + +Endpoints: + +```text +GET /healthz +GET /models +POST /v1/worker/generate +POST /v1/worker/extract-memory-candidates +POST /v1/worker/condense-notification +``` + +`GET /healthz` response fields: + +```json +{ + "ok": true, + "model": "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov", + "model_path": "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov", + "device": "NPU", + "cache_dir": "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4", + "cache_exists": true, + "loaded": false, + "initial_load_ms": null, + "busy_time_us": 0, + "max_input_chars": 6000, + "jobs": ["memory_candidate", "notification", "summary", "title"], + "bind": "127.0.0.1:18820" +} +``` + +`POST /v1/worker/generate` request: + +```json +{ + "job": "summary", + "input": "Synthetic non-private text to summarize.", + "max_new_tokens": 80 +} +``` + +Specialized aliases: + +- `POST /v1/worker/extract-memory-candidates` implies `job=memory_candidate`. +- `POST /v1/worker/condense-notification` implies `job=notification`. +- Backward-compatible request `job=memory` may map to `memory_candidate`, but new clients should use `memory_candidate`. + +Successful generation response: + +```json +{ + "model": "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov", + "device": "NPU", + "job": "summary", + "text": "...", + "json": null, + "timing_ms": { + "load": 0.0, + "initial_load": 10989.08, + "generate": 3157.94, + "total": 3157.94 + }, + "npu_busy_delta_us": 2650724, + "npu_busy_before_us": 123, + "npu_busy_after_us": 2650847, + "cache_dir": "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4" +} +``` + +Validation/error behavior: + +- Unsupported path: `404` JSON `{"error":"not found"}`. +- Unsupported job, empty input, too-long input, invalid token bound, missing model, or generation failure: JSON `{"error":"..."}` with non-2xx preferred for future implementations. The current stdlib prototype returns `400` for these errors. +- If `npu_busy_delta_us <= 0`, the response should be treated as failed by smoke tests even if an HTTP handler emitted `200`. + +## Prompt/job contract + +`title`: + +- Input: short task/log/message excerpt. +- Output: one title, 8 words or fewer, no markdown required. +- Default `max_new_tokens`: 32. + +`summary`: + +- Input: synthetic/non-private text excerpt. +- Output: one short paragraph or up to 4 bullets. +- Default `max_new_tokens`: 160. + +`notification`: + +- Input: synthetic/non-private alert/log excerpt. +- Output target: JSON object with `severity`, `category`, `summary`, `action_needed`. +- Default `max_new_tokens`: 96. +- Client must tolerate `json: null` and parse/validate before using output. + +`memory_candidate`: + +- Input: synthetic/non-private conversation excerpt. +- Output target: JSON object with `candidates` and `notes`; candidates are proposals only. +- Default `max_new_tokens`: 192. +- This worker must never call Hermes memory tools or write durable memory directly. + +## Smoke-test plan using non-private data + +Do not use private vault notes, screenshots, email, chat logs, or document/image directories. Use synthetic text like this: + +```text +Atlas received a kanban notification that an OpenVINO NPU prototype finished smoke testing. The reviewer needs a concise status and next action. No live gateway routing changed. +``` + +Direct NPU smoke: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +before=$(cat /sys/class/accel/accel0/device/npu_busy_time_us) +/home/will/.venvs/npu/bin/python smoke_llm_npu.py \ + --prompt 'Write a concise title for: synthetic NPU worker contract smoke.' \ + --max-new-tokens 24 +status=$? +after=$(cat /sys/class/accel/accel0/device/npu_busy_time_us) +printf 'external_busy_delta_us=%s\n' "$((after-before))" +test "$status" -eq 0 +test "$((after-before))" -gt 0 +``` + +Temporary HTTP smoke: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +/home/will/.venvs/npu/bin/python worker.py --host 127.0.0.1 --port 18820 & +pid=$! +trap 'kill "$pid" 2>/dev/null || true' EXIT + +curl -fsS http://127.0.0.1:18820/healthz | python -m json.tool +before=$(cat /sys/class/accel/accel0/device/npu_busy_time_us) +curl -fsS http://127.0.0.1:18820/v1/worker/generate \ + -H 'Content-Type: application/json' \ + -d '{"job":"title","input":"Synthetic NPU worker smoke with no routing changes.","max_new_tokens":24}' \ + | tee /tmp/openvino-genai-worker-smoke.json \ + | python -m json.tool +after=$(cat /sys/class/accel/accel0/device/npu_busy_time_us) +python - <<'PY' +import json +p=json.load(open('/tmp/openvino-genai-worker-smoke.json')) +assert p['npu_busy_delta_us'] > 0, p +assert p['device'] == 'NPU', p +PY +test "$((after-before))" -gt 0 +kill "$pid" +trap - EXIT +``` + +Also verify the temporary listener is gone: + +```bash +ss -ltnp | grep ':18820' && { echo 'temporary smoke server still running'; exit 1; } || true +``` + +## NPU busy-time verification plan + +Acceptance for any NPU claim requires all of the following: + +1. Confirm the sysfs counter exists and is readable: + `test -r /sys/class/accel/accel0/device/npu_busy_time_us`. +2. Read `busy_before` immediately before the generation call. +3. Run exactly one bounded generation against the candidate worker. +4. Read `busy_after` immediately after generation completes. +5. Require `busy_after > busy_before` and response `npu_busy_delta_us > 0`. +6. Record model id, runtime version, prompt chars, max tokens, load/generate timings, and busy delta in the review handoff. +7. If the counter is unchanged, mark the smoke as failed even if HTTP returned `200` and text was generated. + +Because the NPU is shared, a positive external delta proves NPU activity during the window but not exclusive attribution. Prefer a quiet window with no concurrent Whisper/embedding jobs for review-grade measurements; otherwise repeat and compare worker-reported internal delta with the external counter. + +## Docs/diagram implications + +If this worker is kept as a prototype, docs and diagrams should show: + +- Live baseline remains RAG `:18810`, Whisper NPU `:18816`, embeddings `:18817`. +- GenAI worker `:18820` is proposed/prototype/not-live unless explicitly approved and enabled. +- No arrow from Hermes/Atlas gateway or LiteLLM primary routing to `:18820` unless a later approved integration actually exists. +- Runbooks should include the CLI/HTTP smoke commands, `ss` listener checks, and NPU busy-time counter checks. +- Service maps should label this as "bounded background generation" rather than "chat" or "assistant model". + +## Explicit no-go / defer criteria + +No-go for implementation or promotion: + +- Model path missing, OpenVINO GenAI import fails, or NPU device is unavailable. +- `/sys/class/accel/accel0/device/npu_busy_time_us` is unreadable or does not increase during generation. +- Warm bounded jobs exceed the prototype latency target or starve live Whisper/embedding services. +- The worker needs private documents/images/chat logs for smoke testing. +- The worker requires Atlas/Hermes/gateway/LiteLLM/RAG routing changes to demonstrate value. +- The API starts accepting arbitrary chat history, tool-call instructions, unbounded prompts, or large outputs. +- The service logs raw prompt bodies by default. +- Persistent service enablement is requested without an explicit Will approval gate and a reviewer smoke handoff. + +Defer, do not solve in this lane: + +- Primary assistant routing, LiteLLM model registration, gateway fallback, or tool-calling integration. +- RAG query rewriting, RAG answer generation, or collection mutation. +- Private document/image triage. +- Multi-model selection, CPU/GPU fallback policy, batching, streaming, or auth exposure beyond localhost. diff --git a/openvino-genai-npu-worker/README.md b/openvino-genai-npu-worker/README.md index c7b241b..1baf519 100644 --- a/openvino-genai-npu-worker/README.md +++ b/openvino-genai-npu-worker/README.md @@ -15,6 +15,7 @@ The worker does not write memory, does not restart Atlas/Hermes, does not change ## Files +- `CONTRACT.md` — bounded-worker service contract, endpoint/CLI API, smoke plan, NPU verification, docs implications, and no-go criteria. - `worker.py` — stdlib HTTP API plus CLI wrapper. - `smoke_llm_npu.py` — direct GenAI smoke test with NPU busy-time verification. - `systemd/openvino-genai-npu-worker.service` — optional user-service template; not installed by this prototype. From 4cf3414fdbd1beda451594a70182d810dc70499e Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 12:06:31 -0700 Subject: [PATCH 3/6] docs: specify OpenVINO router classifier contract --- openvino-classifier-npu/CONTRACT.md | 331 ++++++++++++++++++++++++++++ openvino-classifier-npu/README.md | 4 + 2 files changed, 335 insertions(+) create mode 100644 openvino-classifier-npu/CONTRACT.md diff --git a/openvino-classifier-npu/CONTRACT.md b/openvino-classifier-npu/CONTRACT.md new file mode 100644 index 0000000..8e29eac --- /dev/null +++ b/openvino-classifier-npu/CONTRACT.md @@ -0,0 +1,331 @@ +# OpenVINO NPU classifier/router dry-run contract + +Status: specification for dry-run prototype refresh +Target port: `127.0.0.1:18819` +Owner context: Atlas/Hermes local assistant sidecar evaluation + +This service is an advisory classifier for Atlas/Hermes automation hints. It may suggest labels such as tool-needed, memory-candidate type, urgency, workflow category, and safety-confirmation-required, but it must not make or enforce live routing, memory, tool, or safety decisions without a separate explicit approval from Will. + +## Recommended model and runtime + +Recommended v1 runtime: small local Python HTTP/CLI service backed by the existing OpenVINO NPU embeddings service on `127.0.0.1:18817`. + +Recommended v1 model shape: + +- Primary signal: `bge-base-en-v1.5-int8-ov` embeddings from the live embeddings service. +- Classifier layer: inspectable deterministic rules plus cosine similarity against curated synthetic/prototype utterances. +- Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0`. +- Device proof: request-level `npu_busy_delta_us` from `:18817` plus direct sysfs before/after reads from `/sys/class/accel/accel0/device/npu_busy_time_us`. + +Why this is preferred for the dry run: + +1. It reuses the already-live NPU embeddings path rather than adding a second model conversion/runtime dependency before contract validation. +2. Rules and prototypes are transparent enough for safety-sensitive routing hints; a reviewer can inspect why a message was labeled. +3. It avoids fine-tuning or training on private Atlas/Hermes transcripts. +4. It keeps the service small, localhost-only, and easy to start/stop during smoke tests. +5. It produces NPU activity through the embeddings path while making clear that final decision logic remains advisory. + +Defer a dedicated NPU sequence-classification model such as TinyBERT/MiniLM until the dry-run labels and thresholds have been evaluated against synthetic fixtures and explicitly-approved non-private examples. If pursued later, use OpenVINO Runtime/Optimum export with fixed input shapes suitable for NPU, and keep the rule layer for safety gates. + +## Non-goals and safety invariants + +The service must not: + +- Change Hermes/Atlas model routing, gateway routing, memory writes, tool-use permissions, or safety-confirmation behavior. +- Restart, stop, enable, or persist any live Atlas/Hermes/gateway/RAG service. +- Bind to anything broader than `127.0.0.1` by default. +- Mutate Chroma/vector collections, trigger reindexing, or write to RAG state. +- Process private document/image directories or private transcript dumps for smoke testing. +- Log raw prompts by default beyond normal foreground stderr during local review. +- Claim NPU success from HTTP 200 alone. + +## Endpoint contract + +All HTTP endpoints are local-only by default. + +Base URL: + +```text +http://127.0.0.1:18819 +``` + +### GET `/healthz`, `/health`, `/readyz`, `/` + +Purpose: liveness/readiness metadata. + +Response fields: + +- `status`: `starting | ok` +- `service`: `atlas-router-classifier` +- `version`: service version string +- `mode`: always `dry_run` +- `model`: model/runtime label +- `embed_url`: upstream embeddings URL +- `device`: expected to say `NPU-via-embedding-service` or equivalent +- `labels`: supported label names +- `embedding_dim`: embedding dimension after warmup +- `prototype_count`: number of synthetic prototype examples loaded +- `prototype_npu_busy_delta_us`: warmup delta reported by upstream embeddings, if available +- `npu_busy_time_us`: current sysfs counter value, if readable +- `warnings`: list of non-fatal warnings + +A healthy service is not enough to prove NPU execution. At least one classification request must also show positive request and sysfs busy deltas. + +### GET `/v1/labels` + +Purpose: publish schema information without dumping private examples. + +Response fields: + +- `model` +- `thresholds` + - `tool_needed`: recommended threshold `0.72` + - `memory_candidate`: recommended threshold `0.78` + - `safety_confirmation_required`: recommended threshold `0.80` + - `workflow_category`: recommended threshold `0.52` +- `enums` + - `memory_candidate`: `none`, `user_preference`, `durable_user_fact`, `environment_fact`, `workflow_convention`, `skill_candidate` + - `urgency`: `low`, `normal`, `high`, `critical` + - `workflow_category`: `chat`, `research`, `coding`, `debugging`, `devops`, `smart_home`, `media`, `note_taking`, `productivity`, `kanban`, `unknown` +- `prototype_ids`: names of curated synthetic prototype buckets + +### POST `/v1/classify` + +Purpose: classify one user/task message for advisory dry-run hints. + +Request: + +```json +{ + "id": "optional-trace-id", + "text": "Urgent: check whether port 18817 is listening and inspect systemd logs.", + "context": { + "platform": "cli", + "source": "user" + }, + "options": { + "include_evidence": true, + "include_embedding_debug": false, + "dry_run": true + } +} +``` + +Required behavior: + +- Reject empty text with HTTP 400. +- Default `dry_run` to true. +- Return no side effects other than local inference and response generation. +- Include evidence by default unless `include_evidence=false`. +- Include embedding/prototype scores only when explicitly requested through `include_embedding_debug=true`. + +Response: + +```json +{ + "id": "optional-trace-id", + "model": "bge-base-en-v1.5-int8-ov/prototype-router-v0", + "created": 1780590000, + "duration_ms": 12.3, + "npu_busy_delta_us": 1234, + "sysfs_npu_busy_delta_us": 1200, + "dry_run": true, + "labels": { + "tool_needed": { + "value": true, + "confidence": 0.84, + "threshold": 0.72, + "reason_codes": ["local_state_requested"] + }, + "memory_candidate": { + "value": "none", + "confidence": 0.31, + "threshold": 0.78, + "reason_codes": [] + }, + "urgency": { + "value": "high", + "confidence": 0.84, + "scores": {"low": 0.0, "normal": 0.2, "high": 0.84, "critical": 0.0}, + "reason_codes": ["urgent_language"] + }, + "workflow_category": { + "value": "devops", + "confidence": 0.86, + "scores": {"devops": 0.86, "unknown": 0.14} + }, + "safety_confirmation_required": { + "value": false, + "confidence": 0.0, + "threshold": 0.8, + "reason_codes": [] + } + }, + "warnings": [], + "evidence": [] +} +``` + +### POST `/v1/batch_classify` + +Purpose: classify a bounded batch of non-private synthetic or explicitly-approved messages. + +Request: + +```json +{ + "items": [ + {"id": "m1", "text": "What time is it in Seattle right now?"}, + {"id": "m2", "text": "Restart the live Atlas gateway and switch primary routing."} + ], + "options": {"include_evidence": false, "dry_run": true} +} +``` + +Response: + +- `model` +- `duration_ms` +- aggregate `npu_busy_delta_us` +- `results`: array of `/v1/classify` responses + +Batch limits for prototype review: + +- Keep batches small, ideally <= 32 items. +- Use only synthetic fixtures unless Will explicitly approves a real non-private sample set. +- Do not retain request bodies to disk. + +## CLI contract + +The same implementation should support foreground review from the service directory: + +```bash +cd /home/will/lab/swarm/openvino-classifier-npu +/home/will/.venvs/npu/bin/python router_classifier.py \ + --host 127.0.0.1 \ + --port 18819 \ + --embed-url http://127.0.0.1:18817/v1/embeddings +``` + +Required flags/env: + +- `--host` / `OPENVINO_CLASSIFIER_HOST`; default `127.0.0.1`. +- `--port` / `OPENVINO_CLASSIFIER_PORT`; default `18819`. +- `--embed-url` / `OPENVINO_CLASSIFIER_EMBED_URL`; default `http://127.0.0.1:18817/v1/embeddings`. +- `--timeout-s` / `OPENVINO_CLASSIFIER_TIMEOUT_S`; default `30`. +- `--no-warmup` to defer prototype embedding until first request. + +A future dedicated CLI mode may be added for one-shot JSONL classification, but foreground HTTP review is sufficient for the dry-run contract. + +## Synthetic smoke-test plan + +Preconditions: + +1. Confirm `:18817` embeddings service is healthy. +2. Confirm `:18819` is not already listening. +3. Read `/sys/class/accel/accel0/device/npu_busy_time_us` before starting the request smoke. +4. Use only synthetic fixture text such as `fixtures/atlas_hermes_messages.jsonl`. + +Unit/schema smoke, no NPU dependency: + +```bash +cd /home/will/lab/swarm +/home/will/.venvs/npu/bin/python -m unittest discover -s openvino-classifier-npu/tests -v +``` + +Foreground service smoke: + +```bash +ss -ltnp | grep ':18819\b' || true +cd /home/will/lab/swarm/openvino-classifier-npu +/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819 +``` + +From another shell: + +```bash +curl -fsS http://127.0.0.1:18819/healthz | jq . +curl -fsS http://127.0.0.1:18819/v1/labels | jq . +curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"smoke-devops","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true,"dry_run":true}}' | jq . +curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"smoke-safety","text":"Restart the live Atlas gateway and switch primary routing to the new classifier.","options":{"include_evidence":true,"dry_run":true}}' | jq . +``` + +Expected label checks: + +- `smoke-devops`: `tool_needed.value=true`, `urgency.value=high`, `workflow_category.value=devops`. +- `smoke-safety`: `safety_confirmation_required.value=true`, no actual restart or routing change. +- Health and classify responses include no raw private paths or private document content. + +Shutdown: + +- Stop the foreground server with Ctrl-C. +- Re-run `ss -ltnp | grep ':18819\b' || true` and confirm no listener remains. + +## NPU busy-time verification plan + +Use sysfs plus service response fields; do not accept HTTP 200 alone. + +```bash +BUSY=/sys/class/accel/accel0/device/npu_busy_time_us +before=$(cat "$BUSY") +response=$(curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"npu-proof","text":"Check current systemd service status for the embeddings service.","options":{"include_evidence":false,"dry_run":true}}') +after=$(cat "$BUSY") +echo "$response" | jq '{npu_busy_delta_us, sysfs_npu_busy_delta_us, warnings}' +echo "outer_sysfs_npu_busy_delta_us=$((after-before))" +``` + +Acceptance for an NPU-backed classification request: + +- HTTP request succeeds. +- Response `npu_busy_delta_us > 0` from upstream embeddings. +- Response `sysfs_npu_busy_delta_us > 0` when sysfs is readable. +- Outer shell `after-before > 0`. +- If any delta is missing or <= 0, mark NPU proof failed or inconclusive and do not claim NPU execution. + +## Docs and diagram implications + +If this prototype is refreshed or reviewed, update documentation to show: + +- Live baseline remains RAG `:18810`, RAG health `:18814`, Whisper NPU `:18816`, and embeddings `:18817`. +- Classifier/router `:18819` is an optional prototype sidecar, not a live Atlas/Hermes routing dependency. +- Any architecture diagram should place `:18819` under local AI/search/voice prototype sidecars with a clear `dry-run / not live routing` label. +- Runbooks should list foreground start, health/classify smoke, sysfs NPU proof, and shutdown checks. +- Service catalog entries should state `not installed/enabled` until Will approves persistent service enablement. +- No docs should imply the classifier decides memory writes, tool permission, safety confirmation, or live routing. + +Relevant docs inventory: + +- `docs/swarm-infrastructure.md` +- `docs/swarm-infrastructure.html` +- `docs/diagram-maintenance.md` +- `swarm-common/obsidian-vault/will/will-shared-zap/Runbooks/OpenVINO NPU Services Runbook.md` +- `swarm-common/obsidian-vault/will/will-shared-zap/Resources/Service Catalog.md` + +## No-go / defer criteria + +Do not proceed to implementation refresh, persistent service enablement, or live integration if any of the following hold: + +- `:18817` embeddings is unavailable and no approved NPU embedding fallback exists. +- `/sys/class/accel/accel0/device/npu_busy_time_us` is missing/unreadable and NPU proof cannot be independently established. +- Classification responses cannot produce positive NPU busy-time deltas. +- `:18819` is already occupied by an unknown or live service. +- Smoke tests require private transcripts, private document/image directories, or production routing changes. +- Labels are too noisy on synthetic fixtures to be useful as advisory hints. +- The service would need to bind externally, run persistently, or integrate with live Hermes/Atlas before Will approves those gates. +- Any implementation path requires mutating Chroma/vector collections or triggering RAG reindexing in place. + +## Implementation handoff notes + +Recommended next engineer actions: + +1. Verify or refresh `openvino-classifier-npu/router_classifier.py` to match this contract. +2. Keep the service stdlib/local-first unless a dependency is already present in `/home/will/.venvs/npu`. +3. Maintain synthetic fixtures and unit tests for label schema/threshold behavior. +4. Run only foreground smokes; do not install or enable `openvino-router-classifier.service`. +5. Capture changed files, unit test output, listener checks, response samples, and NPU busy-time before/after in the implementation handoff. diff --git a/openvino-classifier-npu/README.md b/openvino-classifier-npu/README.md index 1d42223..74654cb 100644 --- a/openvino-classifier-npu/README.md +++ b/openvino-classifier-npu/README.md @@ -2,6 +2,10 @@ Dry-run Atlas/Hermes message classifier/router prototype. +The detailed dry-run contract is in [`CONTRACT.md`](./CONTRACT.md), including the +recommended model/runtime, HTTP/CLI schema, smoke-test plan, NPU busy-time proof, +docs/diagram implications, and no-go/defer criteria. + It reuses the existing OpenVINO NPU embeddings service on `127.0.0.1:18817` and serves an inspectable stdlib HTTP API on `127.0.0.1:18819`. It does not change live Hermes/Atlas routing, write memory, mutate vector collections, restart From 582e0ee553e305db1f57ef38b343d1b26e8cb2fb Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 12:10:18 -0700 Subject: [PATCH 4/6] fix: harden OpenVINO doc triage prototype --- openvino-doc-image-triage-npu/README.md | 4 +- openvino-doc-image-triage-npu/SPEC.md | 6 +-- openvino-doc-image-triage-npu/server.py | 22 ++++++++++- .../tests/smoke_test.py | 39 ++++++++++++++++--- 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/openvino-doc-image-triage-npu/README.md b/openvino-doc-image-triage-npu/README.md index f21d7be..b5847a6 100644 --- a/openvino-doc-image-triage-npu/README.md +++ b/openvino-doc-image-triage-npu/README.md @@ -2,7 +2,7 @@ Local-only, CLI-first prototype for triaging screenshots, photos/scans, and PDF page images. It returns structured JSON metadata and explicitly reports CPU vs NPU stages. -Optional HTTP is a localhost-only prototype on `127.0.0.1:18829` when explicitly started; it is not a live Atlas/Hermes/RAG integration. +Optional HTTP is a localhost/loopback-only prototype on `127.0.0.1:18829` when explicitly started; non-loopback binds are rejected and it is not a live Atlas/Hermes/RAG integration. Location: `/home/will/lab/swarm/openvino-doc-image-triage-npu/` @@ -121,7 +121,7 @@ cd /home/will/lab/swarm/openvino-doc-image-triage-npu /home/will/.venvs/npu/bin/python tests/smoke_test.py ``` -Expected: JSON ending with `"ok": true`. If the embeddings service is up, the result should show positive NPU busy-time delta and each embedded page should report `verified_npu: true`. +Expected: JSON ending with `"ok": true`. The smoke test generates only synthetic fixtures, verifies non-loopback HTTP binds are rejected, starts its temporary server on a preflighted free localhost port, and terminates it before exit. If the embeddings service is up, the result should show positive NPU busy-time delta and each embedded page should report `verified_npu: true`. ## Example output shape diff --git a/openvino-doc-image-triage-npu/SPEC.md b/openvino-doc-image-triage-npu/SPEC.md index 07885e0..d0f7cf4 100644 --- a/openvino-doc-image-triage-npu/SPEC.md +++ b/openvino-doc-image-triage-npu/SPEC.md @@ -5,7 +5,7 @@ Status: CLI-first prototype specification; not a live Atlas/Hermes integration. ## Safety stance - Default workflow is local CLI execution against explicitly named files. -- Optional HTTP is disabled unless a human starts it, binds to localhost, and is intended for `127.0.0.1:18829` only. +- Optional HTTP is disabled unless a human starts it, is constrained to loopback (`127.0.0.1`, `::1`, or `localhost`), and is intended for `127.0.0.1:18829` only. - No persistent systemd unit, Docker service, gateway hook, Atlas/Hermes route, RAG route, Chroma/vector collection mutation, or in-place reindexing is part of this spec. - Smoke data must be synthetic/non-private only. Do not point this tool at Will's private document, image, screenshot, Downloads, Desktop, Obsidian, or photo-library directories without explicit approval. - NPU claims require `/sys/class/accel/accel0/device/npu_busy_time_us` before/after deltas. HTTP 200, JSON output, or model-load success alone is not NPU proof. @@ -107,10 +107,10 @@ Expected smoke coverage: - Runs CLI triage against the synthetic invoice image/PDF under an explicit allowed root. - Asserts privacy flags (`external_uploads: false`, no full path by default). - Asserts invoice category/needs-attention behavior on synthetic text. -- Starts a temporary localhost HTTP server on an ephemeral smoke port, calls `/healthz` and `/triage`, verifies no full path leakage, rejects attempts to widen allowed roots, and rejects external embedding URLs. +- Starts a temporary localhost HTTP server on a preflighted free ephemeral port, calls `/healthz` and `/triage`, verifies no full path leakage, rejects attempts to widen allowed roots, rejects external embedding URLs, and verifies non-loopback binds are rejected. - Terminates the temporary server. -The smoke port in tests should stay ephemeral/non-live (currently `18828`) to avoid claiming `18829` as a persistent service. +The smoke port in tests should stay OS-assigned ephemeral/non-live to avoid claiming `18829` as a persistent service. ## NPU busy-time verification plan diff --git a/openvino-doc-image-triage-npu/server.py b/openvino-doc-image-triage-npu/server.py index 96e62b0..673ccba 100644 --- a/openvino-doc-image-triage-npu/server.py +++ b/openvino-doc-image-triage-npu/server.py @@ -13,6 +13,7 @@ configured allowed roots. It never uploads document/image contents externally. from __future__ import annotations import argparse +import ipaddress import json import os from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer @@ -23,6 +24,19 @@ from urllib.parse import urlparse from triage import DEFAULT_EMBED_URL, TriageOptions, read_npu_busy, triage_batch, triage_file +def _validate_loopback_host(host: str) -> str: + """Reject non-loopback binds; this prototype is never a LAN service.""" + normalized = host.strip() + if normalized == "localhost": + return normalized + try: + if ipaddress.ip_address(normalized).is_loopback: + return normalized + except ValueError: + pass + raise ValueError("host must be localhost/loopback for this prototype") + + def _roots_within_configured(requested_roots: list[Any], configured_roots: list[Path]) -> list[Path]: """Return request roots only when they narrow the startup allowlist.""" narrowed: list[Path] = [] @@ -166,10 +180,14 @@ def main() -> int: parser.add_argument("--port", type=int, default=int(os.environ.get("DOC_IMAGE_TRIAGE_PORT", "18829"))) parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; may repeat") args = parser.parse_args() + try: + host = _validate_loopback_host(args.host) + except ValueError as exc: + parser.error(str(exc)) roots = [Path(p).expanduser().resolve() for p in args.allowed_root] or [Path.cwd().resolve()] - httpd = ThreadingHTTPServer((args.host, args.port), Handler) + httpd = ThreadingHTTPServer((host, args.port), Handler) httpd.allowed_roots = roots # type: ignore[attr-defined] - print(json.dumps({"service": "openvino-doc-image-triage-npu", "host": args.host, "port": args.port, "allowed_roots": [str(p) for p in roots]}), flush=True) + print(json.dumps({"service": "openvino-doc-image-triage-npu", "host": host, "port": args.port, "allowed_roots": [str(p) for p in roots]}), flush=True) httpd.serve_forever() return 0 diff --git a/openvino-doc-image-triage-npu/tests/smoke_test.py b/openvino-doc-image-triage-npu/tests/smoke_test.py index d4fc4af..b504bbc 100644 --- a/openvino-doc-image-triage-npu/tests/smoke_test.py +++ b/openvino-doc-image-triage-npu/tests/smoke_test.py @@ -2,6 +2,7 @@ from __future__ import annotations import json +import socket import subprocess import sys import tempfile @@ -42,6 +43,29 @@ def busy() -> int | None: return None +def choose_free_loopback_port() -> int: + """Ask the OS for a free localhost port and verify it is not listening yet.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + port = int(sock.getsockname()[1]) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as probe: + probe.settimeout(0.25) + assert probe.connect_ex(("127.0.0.1", port)) != 0, f"selected port already has a listener: {port}" + return port + + +def assert_loopback_bind_policy() -> None: + blocked = subprocess.run( + [sys.executable, "server.py", "--host", "0.0.0.0", "--port", "0", "--allowed-root", str(ROOT)], + cwd=ROOT, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + assert blocked.returncode != 0, blocked.stdout + blocked.stderr + assert "loopback" in blocked.stderr.lower(), blocked.stderr + + def main() -> int: run([sys.executable, "make_samples.py"]) invoice = SAMPLES / "synthetic_invoice.png" @@ -69,20 +93,23 @@ def main() -> int: assert (emb.get("npu_busy_delta_us") or 0) > 0, emb assert after > before, {"before": before, "after": after, "embedding": emb} - # HTTP smoke on an ephemeral localhost port so we do not collide with 18820 during tests. - proc = subprocess.Popen([sys.executable, "server.py", "--host", "127.0.0.1", "--port", "18828", "--allowed-root", str(ROOT)], cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + # HTTP smoke on a preflighted free localhost port so we do not collide with live/prototype ports. + assert_loopback_bind_policy() + smoke_port = choose_free_loopback_port() + base_url = f"http://127.0.0.1:{smoke_port}" + proc = subprocess.Popen([sys.executable, "server.py", "--host", "127.0.0.1", "--port", str(smoke_port), "--allowed-root", str(ROOT)], cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) try: deadline = time.time() + 5 while time.time() < deadline: try: - health = urllib.request.urlopen("http://127.0.0.1:18828/healthz", timeout=1).read() + health = urllib.request.urlopen(f"{base_url}/healthz", timeout=1).read() assert b"openvino-doc-image-triage-npu" in health break except Exception: time.sleep(0.1) else: raise AssertionError("server did not become ready") - resp = post_json("http://127.0.0.1:18828/triage", {"path": str(invoice), "options": {"allowed_roots": [str(ROOT)]}}) + resp = post_json(f"{base_url}/triage", {"path": str(invoice), "options": {"allowed_roots": [str(ROOT)]}}) assert resp["ok"] is True, resp assert resp["result"]["source_path_basename"] == "synthetic_invoice.png" assert "source_path" not in resp["result"] @@ -92,7 +119,7 @@ def main() -> int: outside.write(b"sensitive text outside configured artifact root") outside.flush() status, blocked = post_json_status( - "http://127.0.0.1:18828/triage", + f"{base_url}/triage", {"path": outside.name, "options": {"allowed_roots": ["/tmp"], "dry_run": True, "use_embeddings": False}}, ) assert status == 400, blocked @@ -101,7 +128,7 @@ def main() -> int: # Request bodies must not redirect extracted text to caller-supplied endpoints. status, blocked = post_json_status( - "http://127.0.0.1:18828/triage", + f"{base_url}/triage", {"path": str(invoice), "options": {"embedding_url": "http://198.51.100.1:9/v1/embeddings"}}, ) assert status == 400, blocked From b538a5a1f900d7417cd6bf711c169a8f5839652e Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 12:13:44 -0700 Subject: [PATCH 5/6] feat: refresh OpenVINO GenAI NPU worker prototype --- openvino-genai-npu-worker/CONTRACT.md | 11 +- openvino-genai-npu-worker/README.md | 26 ++++ openvino-genai-npu-worker/pytest.ini | 2 + openvino-genai-npu-worker/smoke_llm_npu.py | 28 ++-- .../systemd/openvino-genai-npu-worker.service | 1 + .../tests/test_worker.py | 131 ++++++++++++++++++ openvino-genai-npu-worker/worker.py | 90 ++++++++---- 7 files changed, 253 insertions(+), 36 deletions(-) create mode 100644 openvino-genai-npu-worker/pytest.ini create mode 100644 openvino-genai-npu-worker/tests/test_worker.py diff --git a/openvino-genai-npu-worker/CONTRACT.md b/openvino-genai-npu-worker/CONTRACT.md index babebbb..604170c 100644 --- a/openvino-genai-npu-worker/CONTRACT.md +++ b/openvino-genai-npu-worker/CONTRACT.md @@ -1,6 +1,6 @@ # Bounded OpenVINO GenAI NPU worker contract -Status: proposed prototype contract; not a live Atlas/Hermes routing dependency. +Status: prototype contract implemented locally; not a live Atlas/Hermes routing dependency. Default address: `http://127.0.0.1:18820`. ## Purpose and hard boundary @@ -167,7 +167,7 @@ Validation/error behavior: - Unsupported path: `404` JSON `{"error":"not found"}`. - Unsupported job, empty input, too-long input, invalid token bound, missing model, or generation failure: JSON `{"error":"..."}` with non-2xx preferred for future implementations. The current stdlib prototype returns `400` for these errors. -- If `npu_busy_delta_us <= 0`, the response should be treated as failed by smoke tests even if an HTTP handler emitted `200`. +- If `npu_busy_delta_us <= 0`, the response should be treated as failed by smoke tests even if an HTTP handler emitted `200`; the refreshed prototype returns `503` with the generation payload plus an `error` field. ## Prompt/job contract @@ -253,6 +253,13 @@ Also verify the temporary listener is gone: ss -ltnp | grep ':18820' && { echo 'temporary smoke server still running'; exit 1; } || true ``` +Unit tests that do not load the model or require private data: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +python -m pytest -q +``` + ## NPU busy-time verification plan Acceptance for any NPU claim requires all of the following: diff --git a/openvino-genai-npu-worker/README.md b/openvino-genai-npu-worker/README.md index 1baf519..0cfda4a 100644 --- a/openvino-genai-npu-worker/README.md +++ b/openvino-genai-npu-worker/README.md @@ -18,6 +18,7 @@ The worker does not write memory, does not restart Atlas/Hermes, does not change - `CONTRACT.md` — bounded-worker service contract, endpoint/CLI API, smoke plan, NPU verification, docs implications, and no-go criteria. - `worker.py` — stdlib HTTP API plus CLI wrapper. - `smoke_llm_npu.py` — direct GenAI smoke test with NPU busy-time verification. +- `tests/test_worker.py` — unit tests with a fake GenAI pipeline and synthetic busy-time counter. - `systemd/openvino-genai-npu-worker.service` — optional user-service template; not installed by this prototype. ## Model/cache @@ -73,15 +74,20 @@ Observed cold-ish smoke after download/cache setup: --input 'Kanban task asks for a small OpenVINO GenAI NPU worker prototype.' ``` +Exit code is non-zero if validation fails, generation fails, or the worker-reported `npu_busy_delta_us` is not positive. + ## HTTP usage Start locally only: ```bash cd /home/will/lab/swarm/openvino-genai-npu-worker +ss -ltnp | grep ':18820' && { echo 'port 18820 already in use'; exit 1; } || true /home/will/.venvs/npu/bin/python worker.py --host 127.0.0.1 --port 18820 ``` +The server also refuses startup if a listener is already accepting connections on `127.0.0.1:18820`. + Endpoints: ```text @@ -103,6 +109,26 @@ curl -s http://127.0.0.1:18820/v1/worker/generate \ Response includes `npu_busy_delta_us`; treat zero as failure even if HTTP status is 200. +## Unit tests + +These tests use only synthetic strings and a fake GenAI pipeline, so they do not load the model or touch private data: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +python -m pytest -q +``` + +## Environment variables + +```text +OV_GENAI_NPU_MODEL=/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov +OV_GENAI_NPU_CACHE=/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4 +OV_GENAI_NPU_HOST=127.0.0.1 +OV_GENAI_NPU_PORT=18820 +``` + +Only `127.0.0.1` is accepted by the current prototype; wider binds require an explicit code change and approval. + ## Safety boundaries - Binds only to `127.0.0.1` by default; non-local bind is refused in code. diff --git a/openvino-genai-npu-worker/pytest.ini b/openvino-genai-npu-worker/pytest.ini new file mode 100644 index 0000000..5ee6477 --- /dev/null +++ b/openvino-genai-npu-worker/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +testpaths = tests diff --git a/openvino-genai-npu-worker/smoke_llm_npu.py b/openvino-genai-npu-worker/smoke_llm_npu.py index aba039f..533632c 100644 --- a/openvino-genai-npu-worker/smoke_llm_npu.py +++ b/openvino-genai-npu-worker/smoke_llm_npu.py @@ -10,31 +10,42 @@ import argparse import json import time from pathlib import Path - -import openvino_genai as ov_genai +from typing import Any DEFAULT_MODEL = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_CACHE = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4" BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") -def read_busy() -> int: - return int(BUSY_PATH.read_text().strip()) +def import_openvino_genai() -> Any: + import openvino_genai as ov_genai # type: ignore[import-not-found] + + return ov_genai + + +def read_busy(path: Path = BUSY_PATH) -> int: + return int(path.read_text().strip()) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--model", default=DEFAULT_MODEL) parser.add_argument("--cache-dir", default=DEFAULT_CACHE) - parser.add_argument("--prompt", default="Write a concise title for: User asked Atlas to summarize NPU worker options.") + parser.add_argument("--busy-path", default=str(BUSY_PATH)) + parser.add_argument("--prompt", default="Write a concise title for: Synthetic NPU worker contract smoke with no routing changes.") parser.add_argument("--max-new-tokens", type=int, default=24) args = parser.parse_args() model_path = Path(args.model) cache_dir = Path(args.cache_dir) + busy_path = Path(args.busy_path) cache_dir.mkdir(parents=True, exist_ok=True) if not model_path.exists(): raise SystemExit(f"model path does not exist: {model_path}") + if not busy_path.exists(): + raise SystemExit(f"NPU busy-time counter does not exist: {busy_path}") + if args.max_new_tokens < 1 or args.max_new_tokens > 256: + raise SystemExit("max-new-tokens must be between 1 and 256") config = { "CACHE_DIR": str(cache_dir), @@ -44,15 +55,16 @@ def main() -> int: "GENERATE_HINT": "FAST_COMPILE", } - before = read_busy() + ov_genai = import_openvino_genai() + before = read_busy(busy_path) load_start = time.monotonic() - pipe = ov_genai.LLMPipeline(str(model_path), "NPU", config) + pipe = ov_genai.LLMPipeline(str(model_path), "NPU", **config) load_ms = round((time.monotonic() - load_start) * 1000, 2) gen_start = time.monotonic() output = pipe.generate(args.prompt, max_new_tokens=args.max_new_tokens) gen_ms = round((time.monotonic() - gen_start) * 1000, 2) - after = read_busy() + after = read_busy(busy_path) result = { "model": str(model_path), "device": "NPU", diff --git a/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service b/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service index 910c940..e2b530f 100644 --- a/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service +++ b/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service @@ -7,6 +7,7 @@ Type=simple WorkingDirectory=/home/will/lab/swarm/openvino-genai-npu-worker Environment=OV_GENAI_NPU_MODEL=/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov Environment=OV_GENAI_NPU_CACHE=/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4 +Environment=OV_GENAI_NPU_HOST=127.0.0.1 Environment=OV_GENAI_NPU_PORT=18820 ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-genai-npu-worker/worker.py --host 127.0.0.1 --port 18820 Restart=on-failure diff --git a/openvino-genai-npu-worker/tests/test_worker.py b/openvino-genai-npu-worker/tests/test_worker.py new file mode 100644 index 0000000..413ec09 --- /dev/null +++ b/openvino-genai-npu-worker/tests/test_worker.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +import worker + + +class FakePipeline: + def __init__(self, model_path: str, device: str, config: dict[str, object], busy_path: Path, output: str = "Synthetic title"): + self.model_path = model_path + self.device = device + self.config = config + self.busy_path = busy_path + self.output = output + self.calls: list[tuple[str, int]] = [] + + def generate(self, prompt: str, *, max_new_tokens: int): + self.calls.append((prompt, max_new_tokens)) + before = int(self.busy_path.read_text().strip()) + self.busy_path.write_text(str(before + 1234)) + return self.output + + +class FakeGenAI: + def __init__(self, busy_path: Path, output: str = "Synthetic title"): + self.busy_path = busy_path + self.output = output + self.pipeline: FakePipeline | None = None + + def LLMPipeline(self, model_path: str, device: str, *args: object, **kwargs: object): # noqa: N802 - mirrors OpenVINO API + if args and isinstance(args[0], dict): + config: dict[str, object] = {str(k): v for k, v in args[0].items()} + else: + config = dict(kwargs) + self.pipeline = FakePipeline(model_path, device, config, self.busy_path, self.output) + return self.pipeline + + +@pytest.fixture() +def worker_paths(tmp_path: Path): + model_path = tmp_path / "model" + cache_dir = tmp_path / "cache" + busy_path = tmp_path / "npu_busy_time_us" + model_path.mkdir() + busy_path.write_text("100") + return model_path, cache_dir, busy_path + + +def test_generate_uses_npu_config_and_reports_busy_delta(monkeypatch: pytest.MonkeyPatch, worker_paths): + model_path, cache_dir, busy_path = worker_paths + fake_genai = FakeGenAI(busy_path) + monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai) + + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_port=18820) + result = npu_worker.generate("title", "Synthetic non-private kanban notification.", max_new_tokens=24) + + assert result.npu_busy_before_us == 100 + assert result.npu_busy_after_us == 1334 + assert result.npu_busy_delta_us == 1234 + assert result.text == "Synthetic title" + assert fake_genai.pipeline is not None + assert fake_genai.pipeline.device == "NPU" + assert fake_genai.pipeline.config["CACHE_DIR"] == str(cache_dir) + assert fake_genai.pipeline.config["MAX_PROMPT_LEN"] == 1024 + assert fake_genai.pipeline.calls[0][1] == 24 + + +def test_memory_alias_json_wrapping(monkeypatch: pytest.MonkeyPatch, worker_paths): + model_path, cache_dir, busy_path = worker_paths + fake_genai = FakeGenAI(busy_path, output='[{"fact":"synthetic stable preference","confidence":0.8}]') + monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai) + + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) + result = npu_worker.generate("memory_candidate", "Synthetic user says they prefer concise answers.") + + assert result.parsed_json is not None + assert result.parsed_json["candidates"][0]["fact"] == "synthetic stable preference" + assert "wrapped" in result.parsed_json["notes"] + + +@pytest.mark.parametrize( + ("job", "user_input", "max_new_tokens", "message"), + [ + ("bad", "hello", 1, "unsupported job"), + ("title", "", 1, "non-empty"), + ("title", "x" * (worker.MAX_INPUT_CHARS + 1), 1, "input too long"), + ("title", "hello", worker.MAX_NEW_TOKENS + 1, "max_new_tokens"), + ], +) +def test_validation_errors(monkeypatch: pytest.MonkeyPatch, worker_paths, job: str, user_input: str, max_new_tokens: int, message: str): + model_path, cache_dir, busy_path = worker_paths + monkeypatch.setattr(worker, "import_openvino_genai", lambda: FakeGenAI(busy_path)) + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) + + with pytest.raises(ValueError, match=message): + npu_worker.generate(job, user_input, max_new_tokens=max_new_tokens) + + +def test_health_reports_actual_bind_and_limits(worker_paths): + model_path, cache_dir, busy_path = worker_paths + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_host="127.0.0.1", bind_port=18821) + + health = npu_worker.health() + + assert health["bind"] == "127.0.0.1:18821" + assert health["max_input_chars"] == 6000 + assert health["max_new_tokens"] == 256 + assert health["busy_time_us"] == 100 + + +def test_response_payload_shape(worker_paths): + model_path, cache_dir, busy_path = worker_paths + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) + result = worker.GenerationResult( + text="ok", + parsed_json={"severity": "info"}, + timing_ms={"load": 1.0, "initial_load": 1.0, "generate": 2.0, "total": 3.0}, + npu_busy_delta_us=5, + npu_busy_before_us=10, + npu_busy_after_us=15, + ) + + payload = worker.response_payload(npu_worker, "notification", result) + + assert json.dumps(payload) + assert payload["device"] == "NPU" + assert payload["job"] == "notification" + assert payload["json"] == {"severity": "info"} diff --git a/openvino-genai-npu-worker/worker.py b/openvino-genai-npu-worker/worker.py index 9ec7ed8..2e11031 100644 --- a/openvino-genai-npu-worker/worker.py +++ b/openvino-genai-npu-worker/worker.py @@ -10,6 +10,7 @@ import argparse import json import os import re +import socket import threading import time from dataclasses import dataclass @@ -18,8 +19,6 @@ from pathlib import Path from typing import Any, cast from urllib.parse import urlparse -import openvino_genai as ov_genai # type: ignore[import-not-found] - MODEL_ID = "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_MODEL_PATH = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_CACHE_DIR = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4" @@ -27,6 +26,14 @@ BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") HOST = "127.0.0.1" PORT = 18820 MAX_INPUT_CHARS = 6000 +MAX_NEW_TOKENS = 256 +GENAI_CONFIG = { + "CACHE_DIR": DEFAULT_CACHE_DIR, + "MAX_PROMPT_LEN": 1024, + "MIN_RESPONSE_LEN": 64, + "PREFILL_HINT": "DYNAMIC", + "GENERATE_HINT": "FAST_COMPILE", +} DEFAULTS = { "title": 32, "summary": 160, @@ -48,8 +55,20 @@ PROMPTS = { } -def read_busy() -> int: - return int(BUSY_PATH.read_text().strip()) +def import_openvino_genai() -> Any: + """Import OpenVINO GenAI lazily so unit tests do not require the NPU venv.""" + + import openvino_genai as ov_genai # type: ignore[import-not-found] + + return ov_genai + + +def listener_exists(host: str, port: int) -> bool: + """Return True when a TCP listener already accepts connections.""" + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(0.2) + return sock.connect_ex((host, port)) == 0 def coerce_json(text: str) -> Any | None: @@ -79,9 +98,20 @@ class GenerationResult: class NpuWorker: - def __init__(self, model_path: str, cache_dir: str): + def __init__( + self, + model_path: str, + cache_dir: str, + *, + busy_path: Path = BUSY_PATH, + bind_host: str = HOST, + bind_port: int = PORT, + ): self.model_path = Path(model_path) self.cache_dir = Path(cache_dir) + self.busy_path = Path(busy_path) + self.bind_host = bind_host + self.bind_port = bind_port self.cache_dir.mkdir(parents=True, exist_ok=True) self._pipe = None self._load_ms: float | None = None @@ -89,21 +119,20 @@ class NpuWorker: self._loaded_at: float | None = None if not self.model_path.exists(): raise FileNotFoundError(f"model path does not exist: {self.model_path}") + if not self.busy_path.exists(): + raise FileNotFoundError(f"NPU busy-time counter does not exist: {self.busy_path}") + + def read_busy(self) -> int: + return int(self.busy_path.read_text().strip()) def load(self) -> None: if self._pipe is not None: return start = time.monotonic() # NPU GenAI requires bounded prompt/response shapes; CACHE_DIR enables compiled blob caching. - self._pipe = ov_genai.LLMPipeline( - str(self.model_path), - "NPU", - CACHE_DIR=str(self.cache_dir), - MAX_PROMPT_LEN=1024, - MIN_RESPONSE_LEN=64, - PREFILL_HINT="DYNAMIC", - GENERATE_HINT="FAST_COMPILE", - ) + ov_genai = import_openvino_genai() + config = GENAI_CONFIG | {"CACHE_DIR": str(self.cache_dir)} + self._pipe = ov_genai.LLMPipeline(str(self.model_path), "NPU", **config) self._load_ms = round((time.monotonic() - start) * 1000, 2) self._loaded_at = time.time() @@ -115,19 +144,19 @@ class NpuWorker: if len(user_input) > MAX_INPUT_CHARS: raise ValueError(f"input too long: {len(user_input)} chars > {MAX_INPUT_CHARS}") max_new_tokens = int(max_new_tokens or DEFAULTS[job]) - if max_new_tokens < 1 or max_new_tokens > 256: - raise ValueError("max_new_tokens must be between 1 and 256") + if max_new_tokens < 1 or max_new_tokens > MAX_NEW_TOKENS: + raise ValueError(f"max_new_tokens must be between 1 and {MAX_NEW_TOKENS}") prompt = PROMPTS[job].format(input=user_input.strip()) with self._lock: load_start = time.monotonic() self.load() load_ms = round((time.monotonic() - load_start) * 1000, 2) - before = read_busy() + before = self.read_busy() gen_start = time.monotonic() pipe = cast(Any, self._pipe) text = str(pipe.generate(prompt, max_new_tokens=max_new_tokens)).strip() generate_ms = round((time.monotonic() - gen_start) * 1000, 2) - after = read_busy() + after = self.read_busy() parsed = coerce_json(text) if job in {"memory_candidate", "notification"} else None if job == "memory_candidate" and isinstance(parsed, list): parsed = {"candidates": parsed, "notes": "model returned a top-level array; worker wrapped it to preserve the API contract"} @@ -151,10 +180,11 @@ class NpuWorker: "loaded": self._pipe is not None, "initial_load_ms": self._load_ms, "loaded_at": self._loaded_at, - "busy_time_us": read_busy(), + "busy_time_us": self.read_busy(), "max_input_chars": MAX_INPUT_CHARS, + "max_new_tokens": MAX_NEW_TOKENS, "jobs": sorted(PROMPTS), - "bind": f"{HOST}:{PORT}", + "bind": f"{self.bind_host}:{self.bind_port}", } @@ -175,7 +205,7 @@ def response_payload(worker: NpuWorker, job: str, result: GenerationResult) -> d def make_handler(worker: NpuWorker): class Handler(BaseHTTPRequestHandler): - server_version = "openvino-genai-npu-worker/0.1" + server_version = "openvino-genai-npu-worker/0.2" def log_message(self, format: str, *args: Any) -> None: # Log only method/path/status metadata, not raw request bodies. @@ -215,7 +245,12 @@ def make_handler(worker: NpuWorker): if job == "memory": job = "memory_candidate" result = worker.generate(job, str(payload.get("input", "")), payload.get("max_new_tokens")) - self.send_json(200, response_payload(worker, job, result)) + body = response_payload(worker, job, result) + if result.npu_busy_delta_us <= 0: + body["error"] = "NPU busy-time counter did not increase during generation" + self.send_json(503, body) + return + self.send_json(200, body) except Exception as exc: self.send_json(400, {"error": str(exc)}) @@ -226,21 +261,24 @@ def cli(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="OpenVINO GenAI NPU worker") parser.add_argument("--model-path", default=os.environ.get("OV_GENAI_NPU_MODEL", DEFAULT_MODEL_PATH)) parser.add_argument("--cache-dir", default=os.environ.get("OV_GENAI_NPU_CACHE", DEFAULT_CACHE_DIR)) - parser.add_argument("--host", default=HOST) + parser.add_argument("--host", default=os.environ.get("OV_GENAI_NPU_HOST", HOST)) parser.add_argument("--port", type=int, default=int(os.environ.get("OV_GENAI_NPU_PORT", PORT))) parser.add_argument("--job", choices=sorted(PROMPTS), help="Run one CLI job instead of serving HTTP") parser.add_argument("--input", help="Input text for --job") parser.add_argument("--max-new-tokens", type=int) args = parser.parse_args(argv) - worker = NpuWorker(args.model_path, args.cache_dir) + if args.host != "127.0.0.1": + raise SystemExit("Refusing non-local bind without code change/explicit approval") + + worker = NpuWorker(args.model_path, args.cache_dir, bind_host=args.host, bind_port=args.port) if args.job: result = worker.generate(args.job, args.input or "", args.max_new_tokens) print(json.dumps(response_payload(worker, args.job, result), indent=2)) return 0 if result.npu_busy_delta_us > 0 else 2 - if args.host != "127.0.0.1": - raise SystemExit("Refusing non-local bind without code change/explicit approval") + if listener_exists(args.host, args.port): + raise SystemExit(f"Refusing to start: listener already exists on {args.host}:{args.port}") server = ThreadingHTTPServer((args.host, args.port), make_handler(worker)) print(f"serving {MODEL_ID} on http://{args.host}:{args.port}; raw prompts are not logged") server.serve_forever() From 1413cfd888f277c760bc94eb91cce7a329bbf1e1 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 12:14:26 -0700 Subject: [PATCH 6/6] [verified] refresh OpenVINO router classifier prototype --- openvino-classifier-npu/CONTRACT.md | 10 +- openvino-classifier-npu/README.md | 14 +++ .../openvino-router-classifier.service | 1 + openvino-classifier-npu/router_classifier.py | 49 ++++++-- openvino-classifier-npu/smoke_classifier.py | 113 ++++++++++++++++++ .../tests/test_router_classifier.py | 19 +++ 6 files changed, 196 insertions(+), 10 deletions(-) create mode 100644 openvino-classifier-npu/smoke_classifier.py diff --git a/openvino-classifier-npu/CONTRACT.md b/openvino-classifier-npu/CONTRACT.md index 8e29eac..34eadba 100644 --- a/openvino-classifier-npu/CONTRACT.md +++ b/openvino-classifier-npu/CONTRACT.md @@ -191,7 +191,7 @@ Response: Batch limits for prototype review: -- Keep batches small, ideally <= 32 items. +- Keep batches small; the prototype rejects empty batches and batches larger than `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE` (default `32`). - Use only synthetic fixtures unless Will explicitly approves a real non-private sample set. - Do not retain request bodies to disk. @@ -213,6 +213,7 @@ Required flags/env: - `--port` / `OPENVINO_CLASSIFIER_PORT`; default `18819`. - `--embed-url` / `OPENVINO_CLASSIFIER_EMBED_URL`; default `http://127.0.0.1:18817/v1/embeddings`. - `--timeout-s` / `OPENVINO_CLASSIFIER_TIMEOUT_S`; default `30`. +- `--max-batch-size` / `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`; default `32`. - `--no-warmup` to defer prototype embedding until first request. A future dedicated CLI mode may be added for one-shot JSONL classification, but foreground HTTP review is sufficient for the dry-run contract. @@ -280,6 +281,13 @@ echo "$response" | jq '{npu_busy_delta_us, sysfs_npu_busy_delta_us, warnings}' echo "outer_sysfs_npu_busy_delta_us=$((after-before))" ``` +Optional localhost smoke helper, after starting the foreground service: + +```bash +/home/will/.venvs/npu/bin/python openvino-classifier-npu/smoke_classifier.py \ + --base-url http://127.0.0.1:18819 +``` + Acceptance for an NPU-backed classification request: - HTTP request succeeds. diff --git a/openvino-classifier-npu/README.md b/openvino-classifier-npu/README.md index 74654cb..b8490b9 100644 --- a/openvino-classifier-npu/README.md +++ b/openvino-classifier-npu/README.md @@ -17,6 +17,7 @@ services, or send external messages. - Default port: `18819` - Default bind: `127.0.0.1` - Upstream: `http://127.0.0.1:18817/v1/embeddings` +- Batch limit: `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`, default `32` - Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0` - NPU proof: `/sys/class/accel/accel0/device/npu_busy_time_us` before/after plus upstream `npu_busy_delta_us` @@ -90,6 +91,10 @@ cd /home/will/lab/swarm/openvino-classifier-npu /home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819 ``` +Environment variables mirror the flags: `OPENVINO_CLASSIFIER_HOST`, +`OPENVINO_CLASSIFIER_PORT`, `OPENVINO_CLASSIFIER_EMBED_URL`, +`OPENVINO_CLASSIFIER_TIMEOUT_S`, and `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`. + Then from another shell: ```bash @@ -102,6 +107,15 @@ curl -fsS http://127.0.0.1:18819/v1/classify \ A valid NPU-backed response must have positive `npu_busy_delta_us`; HTTP 200 by itself is not considered proof. +Synthetic fixture smoke helper, after the foreground service is running: + +```bash +/home/will/.venvs/npu/bin/python smoke_classifier.py --base-url http://127.0.0.1:18819 +``` + +The helper refuses non-local URLs, checks fixture label expectations, and prints +response plus outer sysfs NPU busy deltas. + ## Tests Unit tests use a fake embedding client and do not touch the NPU: diff --git a/openvino-classifier-npu/openvino-router-classifier.service b/openvino-classifier-npu/openvino-router-classifier.service index e537d47..9f44c97 100644 --- a/openvino-classifier-npu/openvino-router-classifier.service +++ b/openvino-classifier-npu/openvino-router-classifier.service @@ -9,6 +9,7 @@ WorkingDirectory=/home/will/lab/swarm/openvino-classifier-npu Environment=OPENVINO_CLASSIFIER_HOST=127.0.0.1 Environment=OPENVINO_CLASSIFIER_PORT=18819 Environment=OPENVINO_CLASSIFIER_EMBED_URL=http://127.0.0.1:18817/v1/embeddings +Environment=OPENVINO_CLASSIFIER_MAX_BATCH_SIZE=32 ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-classifier-npu/router_classifier.py Restart=on-failure RestartSec=5 diff --git a/openvino-classifier-npu/router_classifier.py b/openvino-classifier-npu/router_classifier.py index 379cb29..c5150a1 100644 --- a/openvino-classifier-npu/router_classifier.py +++ b/openvino-classifier-npu/router_classifier.py @@ -30,6 +30,7 @@ MODEL = "bge-base-en-v1.5-int8-ov/prototype-router-v0" DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 18819 DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings" +DEFAULT_MAX_BATCH_SIZE = 32 NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") WORKFLOW_CATEGORIES = [ @@ -150,6 +151,26 @@ def npu_busy_time_us() -> int | None: return None +def env_int(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None: + return default + try: + return int(raw) + except ValueError as exc: + raise SystemExit(f"{name} must be an integer, got {raw!r}") from exc + + +def env_float(name: str, default: float) -> float: + raw = os.environ.get(name) + if raw is None: + return default + try: + return float(raw) + except ValueError as exc: + raise SystemExit(f"{name} must be a number, got {raw!r}") from exc + + def clamp01(value: float) -> float: return max(0.0, min(1.0, value)) @@ -220,9 +241,10 @@ class EmbeddingClient: class ClassifierService: - def __init__(self, embed_url: str, *, timeout_s: float = 30.0) -> None: + def __init__(self, embed_url: str, *, timeout_s: float = 30.0, max_batch_size: int = DEFAULT_MAX_BATCH_SIZE) -> None: self.embed_url = embed_url self.client = EmbeddingClient(embed_url, timeout_s=timeout_s) + self.max_batch_size = max(1, int(max_batch_size)) self.loaded_at = time.time() self.prototype_texts: list[str] = [] self.prototype_keys: list[str] = [] @@ -255,6 +277,7 @@ class ClassifierService: "labels": ["tool_needed", "memory_candidate", "urgency", "workflow_category", "safety_confirmation_required"], "embedding_dim": self.embedding_dim, "prototype_count": len(self.prototype_texts), + "max_batch_size": self.max_batch_size, "prototype_npu_busy_delta_us": self.prototype_npu_busy_delta_us, "npu_busy_time_us": npu_busy_time_us(), "uptime_s": round(time.time() - self.loaded_at, 3), @@ -271,6 +294,7 @@ class ClassifierService: "workflow_category": 0.52, }, "enums": {"memory_candidate": MEMORY_VALUES, "urgency": URGENCY_VALUES, "workflow_category": WORKFLOW_CATEGORIES}, + "limits": {"max_batch_size": self.max_batch_size}, "prototype_ids": sorted(PROTOTYPES), } @@ -351,6 +375,10 @@ class ClassifierService: return response def batch_classify(self, items: list[dict[str, Any]], options: dict[str, Any] | None = None) -> dict[str, Any]: + if not items: + raise ValueError("items must contain at least one classification request") + if len(items) > self.max_batch_size: + raise ValueError(f"items exceeds max_batch_size={self.max_batch_size}") started = time.perf_counter() results = [self.classify(item.get("id"), str(item.get("text") or ""), options) for item in items] return { @@ -400,13 +428,15 @@ class ClassifierService: high_rule, high_codes, high_ev = best_rule(text, "urgency_high") critical_rule, critical_codes, critical_ev = best_rule(text, "urgency_critical") low_rule = 0.82 if re.search(r"\b(no rush|whenever convenient|low priority|someday|backlog)\b", text, re.I) else 0.0 - # Urgency is safety-sensitive for notifications. Prefer explicit rules; - # use prototype scores only when they are unusually strong. + # Urgency is safety-sensitive for notifications, so require explicit + # language instead of relying on broad prototype similarity. score_map = { - "low": max(low_rule, scores.get("urgency_low", 0.0) if scores.get("urgency_low", 0.0) >= 0.9 else 0.0), + # Urgency should be explicit; broad embedding similarity otherwise + # turns neutral requests such as "what time is it" into low/high/critical urgency. + "low": low_rule, "normal": 0.68, - "high": max(high_rule, scores.get("urgency_high", 0.0) if scores.get("urgency_high", 0.0) >= 0.9 else 0.0), - "critical": max(critical_rule, scores.get("urgency_critical", 0.0) if scores.get("urgency_critical", 0.0) >= 0.92 else 0.0), + "high": high_rule, + "critical": critical_rule, } if score_map["critical"] >= 0.9: score_map["normal"] = 0.05 @@ -509,13 +539,14 @@ class Handler(BaseHTTPRequestHandler): def main() -> int: parser = argparse.ArgumentParser(description="Dry-run Atlas/Hermes router classifier") parser.add_argument("--host", default=os.environ.get("OPENVINO_CLASSIFIER_HOST", DEFAULT_HOST)) - parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_CLASSIFIER_PORT", DEFAULT_PORT))) + parser.add_argument("--port", type=int, default=env_int("OPENVINO_CLASSIFIER_PORT", DEFAULT_PORT)) parser.add_argument("--embed-url", default=os.environ.get("OPENVINO_CLASSIFIER_EMBED_URL", DEFAULT_EMBED_URL)) - parser.add_argument("--timeout-s", type=float, default=float(os.environ.get("OPENVINO_CLASSIFIER_TIMEOUT_S", "30"))) + parser.add_argument("--timeout-s", type=float, default=env_float("OPENVINO_CLASSIFIER_TIMEOUT_S", 30.0)) + parser.add_argument("--max-batch-size", type=int, default=env_int("OPENVINO_CLASSIFIER_MAX_BATCH_SIZE", DEFAULT_MAX_BATCH_SIZE)) parser.add_argument("--no-warmup", action="store_true", help="skip prototype embedding warmup until first request") args = parser.parse_args() - service = ClassifierService(args.embed_url, timeout_s=args.timeout_s) + service = ClassifierService(args.embed_url, timeout_s=args.timeout_s, max_batch_size=args.max_batch_size) if not args.no_warmup: service.warmup() httpd = ThreadingHTTPServer((args.host, args.port), Handler) diff --git a/openvino-classifier-npu/smoke_classifier.py b/openvino-classifier-npu/smoke_classifier.py new file mode 100644 index 0000000..4f3eb41 --- /dev/null +++ b/openvino-classifier-npu/smoke_classifier.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""Local-only smoke test for the dry-run OpenVINO router classifier. + +This script uses only synthetic fixture messages. It assumes router_classifier.py is +already running on localhost and never installs/enables a persistent service. +""" +from __future__ import annotations + +import argparse +import json +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any + +DEFAULT_BASE_URL = "http://127.0.0.1:18819" +BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") +FIXTURE = Path(__file__).resolve().parent / "fixtures" / "atlas_hermes_messages.jsonl" + + +def npu_busy_time_us() -> int | None: + try: + return int(BUSY_FILE.read_text().strip()) + except Exception: + return None + + +def get_json(url: str, timeout_s: float) -> dict[str, Any]: + with urllib.request.urlopen(url, timeout=timeout_s) as response: # noqa: S310 - localhost smoke URL + return json.loads(response.read().decode("utf-8")) + + +def post_json(url: str, payload: dict[str, Any], timeout_s: float) -> dict[str, Any]: + request = urllib.request.Request( + url, + data=json.dumps(payload).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=timeout_s) as response: # noqa: S310 - localhost smoke URL + return json.loads(response.read().decode("utf-8")) + + +def load_fixture(limit: int) -> list[dict[str, Any]]: + rows = [json.loads(line) for line in FIXTURE.read_text().splitlines() if line.strip()] + return rows[:limit] + + +def assert_expected(result: dict[str, Any], expected: dict[str, Any]) -> list[str]: + failures: list[str] = [] + labels = result.get("labels", {}) + for key, value in expected.items(): + actual_label = labels.get(key, {}) + actual_value = actual_label.get("value") + if actual_value != value: + failures.append(f"{result.get('id')}: {key} expected {value!r}, got {actual_value!r}") + return failures + + +def main() -> int: + parser = argparse.ArgumentParser(description="Smoke-test a running localhost router classifier") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--timeout-s", type=float, default=30.0) + parser.add_argument("--limit", type=int, default=10) + args = parser.parse_args() + + if not args.base_url.startswith("http://127.0.0.1:") and not args.base_url.startswith("http://localhost:"): + raise SystemExit("refusing non-local base URL; this smoke is localhost-only") + + before = npu_busy_time_us() + started = time.perf_counter() + try: + health = get_json(f"{args.base_url.rstrip('/')}/healthz", args.timeout_s) + labels = get_json(f"{args.base_url.rstrip('/')}/v1/labels", args.timeout_s) + rows = load_fixture(args.limit) + results = [] + failures: list[str] = [] + for row in rows: + result = post_json( + f"{args.base_url.rstrip('/')}/v1/classify", + {"id": row["id"], "text": row["text"], "options": {"include_evidence": False, "dry_run": True}}, + args.timeout_s, + ) + results.append(result) + failures.extend(assert_expected(result, row.get("expected", {}))) + after = npu_busy_time_us() + except urllib.error.URLError as exc: + raise SystemExit(f"smoke failed: {exc}") from exc + + response_npu_delta = sum((r.get("npu_busy_delta_us") or 0) for r in results) + outer_sysfs_delta = None if before is None or after is None else after - before + npu_proven = response_npu_delta > 0 and (outer_sysfs_delta is None or outer_sysfs_delta > 0) + summary = { + "ok": not failures, + "service": health.get("service"), + "mode": health.get("mode"), + "model": health.get("model"), + "label_count": len(labels.get("prototype_ids", [])), + "fixture_count": len(results), + "duration_ms": round((time.perf_counter() - started) * 1000, 3), + "response_npu_busy_delta_us": response_npu_delta, + "outer_sysfs_npu_busy_delta_us": outer_sysfs_delta, + "npu_proven": npu_proven, + "failures": failures, + } + print(json.dumps(summary, indent=2, sort_keys=True)) + return 0 if not failures and npu_proven else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/openvino-classifier-npu/tests/test_router_classifier.py b/openvino-classifier-npu/tests/test_router_classifier.py index c044c0c..5c25ccd 100644 --- a/openvino-classifier-npu/tests/test_router_classifier.py +++ b/openvino-classifier-npu/tests/test_router_classifier.py @@ -88,6 +88,14 @@ class RouterClassifierTests(unittest.TestCase): self.assertEqual(len(result["results"]), 2) self.assertGreater(result["npu_busy_delta_us"], 0) + def test_batch_limits_are_enforced(self): + svc = self.service() + with self.assertRaisesRegex(ValueError, "at least one"): + svc.batch_classify([]) + too_many = [{"id": str(i), "text": "What time is it?"} for i in range(router_classifier.DEFAULT_MAX_BATCH_SIZE + 1)] + with self.assertRaisesRegex(ValueError, "max_batch_size"): + svc.batch_classify(too_many) + def test_fixture_file_is_valid_jsonl(self): fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl" rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()] @@ -97,6 +105,17 @@ class RouterClassifierTests(unittest.TestCase): self.assertIn("text", row) self.assertIn("expected", row) + def test_synthetic_fixture_expectations(self): + svc = self.service() + fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl" + rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()] + for row in rows: + with self.subTest(row=row["id"]): + result = svc.classify(row["id"], row["text"], {"include_evidence": False}) + labels = result["labels"] + for label_name, expected_value in row["expected"].items(): + self.assertEqual(labels[label_name]["value"], expected_value) + if __name__ == "__main__": unittest.main()