diff --git a/openvino-classifier-npu/CONTRACT.md b/openvino-classifier-npu/CONTRACT.md new file mode 100644 index 0000000..34eadba --- /dev/null +++ b/openvino-classifier-npu/CONTRACT.md @@ -0,0 +1,339 @@ +# OpenVINO NPU classifier/router dry-run contract + +Status: specification for dry-run prototype refresh +Target port: `127.0.0.1:18819` +Owner context: Atlas/Hermes local assistant sidecar evaluation + +This service is an advisory classifier for Atlas/Hermes automation hints. It may suggest labels such as tool-needed, memory-candidate type, urgency, workflow category, and safety-confirmation-required, but it must not make or enforce live routing, memory, tool, or safety decisions without a separate explicit approval from Will. + +## Recommended model and runtime + +Recommended v1 runtime: small local Python HTTP/CLI service backed by the existing OpenVINO NPU embeddings service on `127.0.0.1:18817`. + +Recommended v1 model shape: + +- Primary signal: `bge-base-en-v1.5-int8-ov` embeddings from the live embeddings service. +- Classifier layer: inspectable deterministic rules plus cosine similarity against curated synthetic/prototype utterances. +- Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0`. +- Device proof: request-level `npu_busy_delta_us` from `:18817` plus direct sysfs before/after reads from `/sys/class/accel/accel0/device/npu_busy_time_us`. + +Why this is preferred for the dry run: + +1. It reuses the already-live NPU embeddings path rather than adding a second model conversion/runtime dependency before contract validation. +2. Rules and prototypes are transparent enough for safety-sensitive routing hints; a reviewer can inspect why a message was labeled. +3. It avoids fine-tuning or training on private Atlas/Hermes transcripts. +4. It keeps the service small, localhost-only, and easy to start/stop during smoke tests. +5. It produces NPU activity through the embeddings path while making clear that final decision logic remains advisory. + +Defer a dedicated NPU sequence-classification model such as TinyBERT/MiniLM until the dry-run labels and thresholds have been evaluated against synthetic fixtures and explicitly-approved non-private examples. If pursued later, use OpenVINO Runtime/Optimum export with fixed input shapes suitable for NPU, and keep the rule layer for safety gates. + +## Non-goals and safety invariants + +The service must not: + +- Change Hermes/Atlas model routing, gateway routing, memory writes, tool-use permissions, or safety-confirmation behavior. +- Restart, stop, enable, or persist any live Atlas/Hermes/gateway/RAG service. +- Bind to anything broader than `127.0.0.1` by default. +- Mutate Chroma/vector collections, trigger reindexing, or write to RAG state. +- Process private document/image directories or private transcript dumps for smoke testing. +- Log raw prompts by default beyond normal foreground stderr during local review. +- Claim NPU success from HTTP 200 alone. + +## Endpoint contract + +All HTTP endpoints are local-only by default. + +Base URL: + +```text +http://127.0.0.1:18819 +``` + +### GET `/healthz`, `/health`, `/readyz`, `/` + +Purpose: liveness/readiness metadata. + +Response fields: + +- `status`: `starting | ok` +- `service`: `atlas-router-classifier` +- `version`: service version string +- `mode`: always `dry_run` +- `model`: model/runtime label +- `embed_url`: upstream embeddings URL +- `device`: expected to say `NPU-via-embedding-service` or equivalent +- `labels`: supported label names +- `embedding_dim`: embedding dimension after warmup +- `prototype_count`: number of synthetic prototype examples loaded +- `prototype_npu_busy_delta_us`: warmup delta reported by upstream embeddings, if available +- `npu_busy_time_us`: current sysfs counter value, if readable +- `warnings`: list of non-fatal warnings + +A healthy service is not enough to prove NPU execution. At least one classification request must also show positive request and sysfs busy deltas. + +### GET `/v1/labels` + +Purpose: publish schema information without dumping private examples. + +Response fields: + +- `model` +- `thresholds` + - `tool_needed`: recommended threshold `0.72` + - `memory_candidate`: recommended threshold `0.78` + - `safety_confirmation_required`: recommended threshold `0.80` + - `workflow_category`: recommended threshold `0.52` +- `enums` + - `memory_candidate`: `none`, `user_preference`, `durable_user_fact`, `environment_fact`, `workflow_convention`, `skill_candidate` + - `urgency`: `low`, `normal`, `high`, `critical` + - `workflow_category`: `chat`, `research`, `coding`, `debugging`, `devops`, `smart_home`, `media`, `note_taking`, `productivity`, `kanban`, `unknown` +- `prototype_ids`: names of curated synthetic prototype buckets + +### POST `/v1/classify` + +Purpose: classify one user/task message for advisory dry-run hints. + +Request: + +```json +{ + "id": "optional-trace-id", + "text": "Urgent: check whether port 18817 is listening and inspect systemd logs.", + "context": { + "platform": "cli", + "source": "user" + }, + "options": { + "include_evidence": true, + "include_embedding_debug": false, + "dry_run": true + } +} +``` + +Required behavior: + +- Reject empty text with HTTP 400. +- Default `dry_run` to true. +- Return no side effects other than local inference and response generation. +- Include evidence by default unless `include_evidence=false`. +- Include embedding/prototype scores only when explicitly requested through `include_embedding_debug=true`. + +Response: + +```json +{ + "id": "optional-trace-id", + "model": "bge-base-en-v1.5-int8-ov/prototype-router-v0", + "created": 1780590000, + "duration_ms": 12.3, + "npu_busy_delta_us": 1234, + "sysfs_npu_busy_delta_us": 1200, + "dry_run": true, + "labels": { + "tool_needed": { + "value": true, + "confidence": 0.84, + "threshold": 0.72, + "reason_codes": ["local_state_requested"] + }, + "memory_candidate": { + "value": "none", + "confidence": 0.31, + "threshold": 0.78, + "reason_codes": [] + }, + "urgency": { + "value": "high", + "confidence": 0.84, + "scores": {"low": 0.0, "normal": 0.2, "high": 0.84, "critical": 0.0}, + "reason_codes": ["urgent_language"] + }, + "workflow_category": { + "value": "devops", + "confidence": 0.86, + "scores": {"devops": 0.86, "unknown": 0.14} + }, + "safety_confirmation_required": { + "value": false, + "confidence": 0.0, + "threshold": 0.8, + "reason_codes": [] + } + }, + "warnings": [], + "evidence": [] +} +``` + +### POST `/v1/batch_classify` + +Purpose: classify a bounded batch of non-private synthetic or explicitly-approved messages. + +Request: + +```json +{ + "items": [ + {"id": "m1", "text": "What time is it in Seattle right now?"}, + {"id": "m2", "text": "Restart the live Atlas gateway and switch primary routing."} + ], + "options": {"include_evidence": false, "dry_run": true} +} +``` + +Response: + +- `model` +- `duration_ms` +- aggregate `npu_busy_delta_us` +- `results`: array of `/v1/classify` responses + +Batch limits for prototype review: + +- Keep batches small; the prototype rejects empty batches and batches larger than `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE` (default `32`). +- Use only synthetic fixtures unless Will explicitly approves a real non-private sample set. +- Do not retain request bodies to disk. + +## CLI contract + +The same implementation should support foreground review from the service directory: + +```bash +cd /home/will/lab/swarm/openvino-classifier-npu +/home/will/.venvs/npu/bin/python router_classifier.py \ + --host 127.0.0.1 \ + --port 18819 \ + --embed-url http://127.0.0.1:18817/v1/embeddings +``` + +Required flags/env: + +- `--host` / `OPENVINO_CLASSIFIER_HOST`; default `127.0.0.1`. +- `--port` / `OPENVINO_CLASSIFIER_PORT`; default `18819`. +- `--embed-url` / `OPENVINO_CLASSIFIER_EMBED_URL`; default `http://127.0.0.1:18817/v1/embeddings`. +- `--timeout-s` / `OPENVINO_CLASSIFIER_TIMEOUT_S`; default `30`. +- `--max-batch-size` / `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`; default `32`. +- `--no-warmup` to defer prototype embedding until first request. + +A future dedicated CLI mode may be added for one-shot JSONL classification, but foreground HTTP review is sufficient for the dry-run contract. + +## Synthetic smoke-test plan + +Preconditions: + +1. Confirm `:18817` embeddings service is healthy. +2. Confirm `:18819` is not already listening. +3. Read `/sys/class/accel/accel0/device/npu_busy_time_us` before starting the request smoke. +4. Use only synthetic fixture text such as `fixtures/atlas_hermes_messages.jsonl`. + +Unit/schema smoke, no NPU dependency: + +```bash +cd /home/will/lab/swarm +/home/will/.venvs/npu/bin/python -m unittest discover -s openvino-classifier-npu/tests -v +``` + +Foreground service smoke: + +```bash +ss -ltnp | grep ':18819\b' || true +cd /home/will/lab/swarm/openvino-classifier-npu +/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819 +``` + +From another shell: + +```bash +curl -fsS http://127.0.0.1:18819/healthz | jq . +curl -fsS http://127.0.0.1:18819/v1/labels | jq . +curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"smoke-devops","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true,"dry_run":true}}' | jq . +curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"smoke-safety","text":"Restart the live Atlas gateway and switch primary routing to the new classifier.","options":{"include_evidence":true,"dry_run":true}}' | jq . +``` + +Expected label checks: + +- `smoke-devops`: `tool_needed.value=true`, `urgency.value=high`, `workflow_category.value=devops`. +- `smoke-safety`: `safety_confirmation_required.value=true`, no actual restart or routing change. +- Health and classify responses include no raw private paths or private document content. + +Shutdown: + +- Stop the foreground server with Ctrl-C. +- Re-run `ss -ltnp | grep ':18819\b' || true` and confirm no listener remains. + +## NPU busy-time verification plan + +Use sysfs plus service response fields; do not accept HTTP 200 alone. + +```bash +BUSY=/sys/class/accel/accel0/device/npu_busy_time_us +before=$(cat "$BUSY") +response=$(curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"npu-proof","text":"Check current systemd service status for the embeddings service.","options":{"include_evidence":false,"dry_run":true}}') +after=$(cat "$BUSY") +echo "$response" | jq '{npu_busy_delta_us, sysfs_npu_busy_delta_us, warnings}' +echo "outer_sysfs_npu_busy_delta_us=$((after-before))" +``` + +Optional localhost smoke helper, after starting the foreground service: + +```bash +/home/will/.venvs/npu/bin/python openvino-classifier-npu/smoke_classifier.py \ + --base-url http://127.0.0.1:18819 +``` + +Acceptance for an NPU-backed classification request: + +- HTTP request succeeds. +- Response `npu_busy_delta_us > 0` from upstream embeddings. +- Response `sysfs_npu_busy_delta_us > 0` when sysfs is readable. +- Outer shell `after-before > 0`. +- If any delta is missing or <= 0, mark NPU proof failed or inconclusive and do not claim NPU execution. + +## Docs and diagram implications + +If this prototype is refreshed or reviewed, update documentation to show: + +- Live baseline remains RAG `:18810`, RAG health `:18814`, Whisper NPU `:18816`, and embeddings `:18817`. +- Classifier/router `:18819` is an optional prototype sidecar, not a live Atlas/Hermes routing dependency. +- Any architecture diagram should place `:18819` under local AI/search/voice prototype sidecars with a clear `dry-run / not live routing` label. +- Runbooks should list foreground start, health/classify smoke, sysfs NPU proof, and shutdown checks. +- Service catalog entries should state `not installed/enabled` until Will approves persistent service enablement. +- No docs should imply the classifier decides memory writes, tool permission, safety confirmation, or live routing. + +Relevant docs inventory: + +- `docs/swarm-infrastructure.md` +- `docs/swarm-infrastructure.html` +- `docs/diagram-maintenance.md` +- `swarm-common/obsidian-vault/will/will-shared-zap/Runbooks/OpenVINO NPU Services Runbook.md` +- `swarm-common/obsidian-vault/will/will-shared-zap/Resources/Service Catalog.md` + +## No-go / defer criteria + +Do not proceed to implementation refresh, persistent service enablement, or live integration if any of the following hold: + +- `:18817` embeddings is unavailable and no approved NPU embedding fallback exists. +- `/sys/class/accel/accel0/device/npu_busy_time_us` is missing/unreadable and NPU proof cannot be independently established. +- Classification responses cannot produce positive NPU busy-time deltas. +- `:18819` is already occupied by an unknown or live service. +- Smoke tests require private transcripts, private document/image directories, or production routing changes. +- Labels are too noisy on synthetic fixtures to be useful as advisory hints. +- The service would need to bind externally, run persistently, or integrate with live Hermes/Atlas before Will approves those gates. +- Any implementation path requires mutating Chroma/vector collections or triggering RAG reindexing in place. + +## Implementation handoff notes + +Recommended next engineer actions: + +1. Verify or refresh `openvino-classifier-npu/router_classifier.py` to match this contract. +2. Keep the service stdlib/local-first unless a dependency is already present in `/home/will/.venvs/npu`. +3. Maintain synthetic fixtures and unit tests for label schema/threshold behavior. +4. Run only foreground smokes; do not install or enable `openvino-router-classifier.service`. +5. Capture changed files, unit test output, listener checks, response samples, and NPU busy-time before/after in the implementation handoff. diff --git a/openvino-classifier-npu/README.md b/openvino-classifier-npu/README.md new file mode 100644 index 0000000..9f32d21 --- /dev/null +++ b/openvino-classifier-npu/README.md @@ -0,0 +1,141 @@ +# OpenVINO NPU router classifier prototype + +Dry-run Atlas/Hermes message classifier/router prototype. + +The detailed dry-run contract is in [`CONTRACT.md`](./CONTRACT.md), including the +recommended model/runtime, HTTP/CLI schema, smoke-test plan, NPU busy-time proof, +docs/diagram implications, and no-go/defer criteria. + +It reuses the existing OpenVINO NPU embeddings service on `127.0.0.1:18817` and +serves an inspectable stdlib HTTP API on `127.0.0.1:18819`. It does not change +live Hermes/Atlas routing, write memory, mutate vector collections, restart +services, or send external messages. + +## Runtime shape + +- Service: `atlas-router-classifier` +- Default port: `18819` +- Default bind: `127.0.0.1` +- Upstream: `http://127.0.0.1:18817/v1/embeddings` +- Batch limit: `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`, default `32` +- Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0` +- NPU proof: `/sys/class/accel/accel0/device/npu_busy_time_us` before/after plus upstream `npu_busy_delta_us` + +The classifier uses deterministic high-precision rules for safety/urgency/tool +signals plus cosine similarity against curated embedding prototypes for workflow +and memory recommendations. This is intentionally tunable without model training. + +## API + +### GET `/healthz` + +Returns service metadata, labels, prototype count, NPU sysfs counter, and warmup +NPU delta. + +### GET `/v1/labels` + +Returns label enum values, thresholds, and prototype IDs without dumping private +fixtures. + +### POST `/v1/classify` + +Request: + +```json +{ + "id": "optional trace id", + "text": "User message or task body to classify.", + "context": {"platform": "cli", "source": "user"}, + "options": { + "include_evidence": true, + "include_embedding_debug": false, + "dry_run": true + } +} +``` + +Response includes: + +- `labels.tool_needed`: boolean, confidence, threshold, reason codes +- `labels.memory_candidate`: `none | user_preference | durable_user_fact | environment_fact | workflow_convention | skill_candidate` +- `labels.urgency`: `low | normal | high | critical` +- `labels.workflow_category`: `chat | research | coding | debugging | devops | smart_home | media | note_taking | productivity | kanban | unknown` +- `labels.safety_confirmation_required`: boolean, confidence, reason codes +- `npu_busy_delta_us` and `sysfs_npu_busy_delta_us` +- `evidence` when requested + +### POST `/v1/batch_classify` + +Request: + +```json +{ + "items": [{"id": "m1", "text": "What time is it?"}], + "options": {"include_evidence": false, "dry_run": true} +} +``` + +## Local smoke test + +Check that the proposed port is free first: + +```bash +ss -ltnp | grep ':18819' || true +``` + +Run without installing anything extra; `/home/will/.venvs/npu` already has the +stdlib plus requests/openvino stack used by the upstream embeddings service: + +```bash +cd /home/will/lab/swarm/openvino-classifier-npu +/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819 +``` + +Environment variables mirror the flags: `OPENVINO_CLASSIFIER_HOST`, +`OPENVINO_CLASSIFIER_PORT`, `OPENVINO_CLASSIFIER_EMBED_URL`, +`OPENVINO_CLASSIFIER_TIMEOUT_S`, and `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`. + +Then from another shell: + +```bash +curl -fsS http://127.0.0.1:18819/healthz | jq . +curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"smoke","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true}}' | jq . +``` + +A valid NPU-backed response must have positive `npu_busy_delta_us`; HTTP 200 by +itself is not considered proof. + +Synthetic fixture smoke helper, after the foreground service is running: + +```bash +/home/will/.venvs/npu/bin/python smoke_classifier.py --base-url http://127.0.0.1:18819 +``` + +The helper refuses non-local URLs, checks fixture label expectations, and prints +response plus outer sysfs NPU busy deltas. + +## Tests + +Unit tests use a fake embedding client and do not touch the NPU: + +```bash +/home/will/.venvs/npu/bin/python -m unittest discover -s openvino-classifier-npu/tests -v +``` + +Fixture messages live at `fixtures/atlas_hermes_messages.jsonl`. + +## Optional systemd user unit + +A draft unit is included as `openvino-router-classifier.service`. Install only +after review/approval: + +```bash +cp openvino-router-classifier.service ~/.config/systemd/user/openvino-router-classifier.service +systemctl --user daemon-reload +systemctl --user start openvino-router-classifier.service +systemctl --user status openvino-router-classifier.service --no-pager +``` + +Do not enable it at boot or connect it to live Atlas/Hermes routing as part of this prototype task without explicit approval. Keep classifier decisions dry-run until a separate approved routing change lands. diff --git a/openvino-classifier-npu/fixtures/atlas_hermes_messages.jsonl b/openvino-classifier-npu/fixtures/atlas_hermes_messages.jsonl new file mode 100644 index 0000000..fd95e95 --- /dev/null +++ b/openvino-classifier-npu/fixtures/atlas_hermes_messages.jsonl @@ -0,0 +1,10 @@ +{"id":"tool-time","text":"What time is it in Seattle right now?","expected":{"tool_needed":true,"workflow_category":"chat","urgency":"normal","safety_confirmation_required":false}} +{"id":"memory-preference","text":"Remember that I prefer concise answers in the terminal.","expected":{"memory_candidate":"user_preference","tool_needed":false,"safety_confirmation_required":false}} +{"id":"coding-debug","text":"Debug the failing pytest suite and inspect the git diff before opening a PR.","expected":{"tool_needed":true,"workflow_category":"debugging","urgency":"normal"}} +{"id":"devops-urgent","text":"Urgent: the embeddings service on port 18817 is down; check systemd logs and restore it.","expected":{"tool_needed":true,"workflow_category":"devops","urgency":"high"}} +{"id":"safety-routing","text":"Restart the live Atlas gateway and switch primary routing to the new classifier.","expected":{"tool_needed":true,"workflow_category":"devops","safety_confirmation_required":true}} +{"id":"destructive-reindex","text":"Delete the existing Chroma collection and reindex the Obsidian vault in place.","expected":{"tool_needed":true,"workflow_category":"note_taking","safety_confirmation_required":true}} +{"id":"research","text":"Research current OpenVINO NPU support for TinyBERT sequence classification and summarize sources.","expected":{"tool_needed":true,"workflow_category":"research"}} +{"id":"smart-home","text":"Turn off the living room lights and set the thermostat to 68.","expected":{"tool_needed":true,"workflow_category":"smart_home"}} +{"id":"media","text":"Transcribe this voice memo and extract action items.","expected":{"tool_needed":true,"workflow_category":"media"}} +{"id":"kanban","text":"Work kanban task t_5e123496 and block it if review is required.","expected":{"tool_needed":true,"workflow_category":"kanban"}} diff --git a/openvino-classifier-npu/openvino-router-classifier.service b/openvino-classifier-npu/openvino-router-classifier.service new file mode 100644 index 0000000..9f44c97 --- /dev/null +++ b/openvino-classifier-npu/openvino-router-classifier.service @@ -0,0 +1,18 @@ +[Unit] +Description=Atlas/Hermes dry-run OpenVINO router classifier +After=network.target openvino-embeddings.service +Wants=openvino-embeddings.service + +[Service] +Type=simple +WorkingDirectory=/home/will/lab/swarm/openvino-classifier-npu +Environment=OPENVINO_CLASSIFIER_HOST=127.0.0.1 +Environment=OPENVINO_CLASSIFIER_PORT=18819 +Environment=OPENVINO_CLASSIFIER_EMBED_URL=http://127.0.0.1:18817/v1/embeddings +Environment=OPENVINO_CLASSIFIER_MAX_BATCH_SIZE=32 +ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-classifier-npu/router_classifier.py +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=default.target diff --git a/openvino-classifier-npu/router_classifier.py b/openvino-classifier-npu/router_classifier.py new file mode 100644 index 0000000..c5150a1 --- /dev/null +++ b/openvino-classifier-npu/router_classifier.py @@ -0,0 +1,563 @@ +#!/usr/bin/env python3 +"""Dry-run Atlas/Hermes router classifier backed by the local OpenVINO NPU embedding service. + +Default port: 18819 +Default upstream: http://127.0.0.1:18817/v1/embeddings + +This service is intentionally advisory only. It does not write memory, mutate routing, +restart services, or call external APIs. NPU execution is proved by the upstream +embedding service's npu_busy_delta_us and by reading the local sysfs busy counter. +""" +from __future__ import annotations + +import argparse +import json +import math +import os +import re +import sys +import time +import urllib.error +import urllib.request +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + +VERSION = "0.1.0" +SERVICE = "atlas-router-classifier" +MODEL = "bge-base-en-v1.5-int8-ov/prototype-router-v0" +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 18819 +DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings" +DEFAULT_MAX_BATCH_SIZE = 32 +NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") + +WORKFLOW_CATEGORIES = [ + "chat", + "research", + "coding", + "debugging", + "devops", + "smart_home", + "media", + "note_taking", + "productivity", + "kanban", + "unknown", +] +MEMORY_VALUES = ["none", "user_preference", "durable_user_fact", "environment_fact", "workflow_convention", "skill_candidate"] +URGENCY_VALUES = ["low", "normal", "high", "critical"] + +PROTOTYPES: dict[str, list[str]] = { + "tool_needed": [ + "check the current date time weather news versions or live facts", + "inspect files git branches logs ports processes disk memory or system state", + "send a message create a cron job call an API or interact with a local service", + "search the web browse a website download or verify current information", + ], + "memory_user_preference": [ + "remember that I prefer concise replies and a direct style", + "my preference is use short answers and avoid unnecessary detail", + "please remember I like this convention for future sessions", + ], + "memory_durable_user_fact": [ + "remember that I live in Seattle and work on local AI infrastructure", + "my name role location identity or durable personal detail is", + ], + "memory_environment_fact": [ + "this project uses pytest and this server runs linux with openvino npu", + "remember this repository convention service port path or environment setup", + ], + "memory_workflow_convention": [ + "for this workflow use this recurring procedure convention or process", + "the team convention is to run checks before code review and use a worktree", + ], + "memory_skill_candidate": [ + "we discovered a reusable multi step workflow that should become a skill", + "save this procedure as a reusable skill after solving a tricky task", + ], + "urgency_low": [ + "whenever convenient no rush low priority idea someday backlog", + ], + "urgency_high": [ + "urgent asap high priority today please handle soon production issue", + "service is degraded broken failing down users are blocked", + ], + "urgency_critical": [ + "critical outage security incident data loss production down emergency now", + "stop the bleeding rollback immediately credentials leaked destructive incident", + ], + "workflow_chat": [ + "answer a general question explain a concept brainstorm rewrite text chat casually", + ], + "workflow_research": [ + "research compare summarize sources papers market docs web search literature review", + ], + "workflow_coding": [ + "implement code write tests refactor add feature fix type errors create a branch", + ], + "workflow_debugging": [ + "debug failing tests inspect logs reproduce error traceback diagnose regression", + ], + "workflow_devops": [ + "operate services systemd docker kubernetes ports health checks deploy infrastructure", + ], + "workflow_smart_home": [ + "turn on lights adjust thermostat control tv speaker home assistant hue wiz", + ], + "workflow_media": [ + "transcribe audio process video image gif spotify music youtube media file", + ], + "workflow_note_taking": [ + "obsidian notes daily diary memory knowledge base document personal context", + ], + "workflow_productivity": [ + "calendar email spreadsheet presentation notion airtable linear task planning", + ], + "workflow_kanban": [ + "kanban task board card assignee handoff review required blocked complete worker", + ], +} + +RULES: dict[str, list[tuple[re.Pattern[str], str, float]]] = { + "tool_needed": [ + (re.compile(r"\b(current|today|now|latest|weather|news|version|price|stock)\b", re.I), "current_fact_requested", 0.88), + (re.compile(r"\b(file|directory|git|branch|commit|diff|log|port|process|disk|memory|cpu|gpu|npu|service|systemd|reindex)\b", re.I), "local_state_requested", 0.84), + (re.compile(r"\b(send|schedule|create cron|call api|download|browse|search web|open website|turn on|turn off|set the thermostat|transcribe|restart|switch primary routing|work kanban|kanban task)\b", re.I), "external_or_tool_action_requested", 0.86), + ], + "safety": [ + (re.compile(r"\b(delete|remove|overwrite|drop|truncate|wipe|reindex|reset --hard|force push)\b", re.I), "destructive_or_irreversible_action", 0.92), + (re.compile(r"\b(restart|stop|deploy|expose|public|0\.0\.0\.0|route live|primary routing|gateway)\b", re.I), "live_service_or_routing_change", 0.88), + (re.compile(r"\b(secret|token|api key|credential|password|private document|external upload|send message|spend money|purchase)\b", re.I), "credential_privacy_or_external_side_effect", 0.9), + ], + "memory": [ + (re.compile(r"\b(remember that|please remember|don'?t forget|my preference|I prefer|call me)\b", re.I), "explicit_memory_language", 0.9), + (re.compile(r"\b(always|for future|going forward|convention|workflow|standard practice)\b", re.I), "durable_convention_language", 0.78), + ], + "urgency_high": [ + (re.compile(r"\b(urgent|asap|immediately|high priority|production|down|broken|blocked)\b", re.I), "urgent_language", 0.84), + ], + "urgency_critical": [ + (re.compile(r"\b(critical|emergency|outage|data loss|credential leak|security incident|prod down)\b", re.I), "critical_incident_language", 0.94), + ], +} + + +def npu_busy_time_us() -> int | None: + try: + return int(NPU_BUSY_FILE.read_text().strip()) + except Exception: + return None + + +def env_int(name: str, default: int) -> int: + raw = os.environ.get(name) + if raw is None: + return default + try: + return int(raw) + except ValueError as exc: + raise SystemExit(f"{name} must be an integer, got {raw!r}") from exc + + +def env_float(name: str, default: float) -> float: + raw = os.environ.get(name) + if raw is None: + return default + try: + return float(raw) + except ValueError as exc: + raise SystemExit(f"{name} must be a number, got {raw!r}") from exc + + +def clamp01(value: float) -> float: + return max(0.0, min(1.0, value)) + + +def cosine(a: list[float], b: list[float]) -> float: + if not a or not b or len(a) != len(b): + return 0.0 + dot = sum(x * y for x, y in zip(a, b)) + na = math.sqrt(sum(x * x for x in a)) + nb = math.sqrt(sum(y * y for y in b)) + if na == 0.0 or nb == 0.0: + return 0.0 + # Map [-1, 1] to [0, 1] for confidence-like scoring. + return clamp01((dot / (na * nb) + 1.0) / 2.0) + + +def best_rule(text: str, group: str) -> tuple[float, list[str], list[dict[str, Any]]]: + best = 0.0 + codes: list[str] = [] + evidence: list[dict[str, Any]] = [] + for pattern, code, score in RULES.get(group, []): + match = pattern.search(text) + if match: + best = max(best, score) + codes.append(code) + evidence.append({"label": group, "source": "rule", "matched": match.group(0), "reason_code": code, "score": score}) + return best, sorted(set(codes)), evidence + + +@dataclass +class EmbedResult: + vectors: list[list[float]] + npu_busy_delta_us: int | None + duration_ms: float + embedding_dim: int | None + + +class EmbeddingClient: + def __init__(self, url: str, timeout_s: float = 30.0) -> None: + self.url = url + self.timeout_s = timeout_s + + def embed(self, texts: list[str], *, purpose: str = "query") -> EmbedResult: + payload = json.dumps({"input": texts, "purpose": purpose}).encode("utf-8") + request = urllib.request.Request( + self.url, + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + started = time.perf_counter() + try: + with urllib.request.urlopen(request, timeout=self.timeout_s) as response: # noqa: S310 - local configured URL + body = response.read().decode("utf-8", "replace") + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", "replace") + raise RuntimeError(f"embedding service HTTP {exc.code}: {detail}") from exc + except urllib.error.URLError as exc: + raise RuntimeError(f"embedding service unavailable at {self.url}: {exc.reason}") from exc + data = json.loads(body) + vectors = [item["embedding"] for item in data.get("data", [])] + return EmbedResult( + vectors=[[float(x) for x in vec] for vec in vectors], + npu_busy_delta_us=data.get("npu_busy_delta_us"), + duration_ms=round((time.perf_counter() - started) * 1000, 3), + embedding_dim=data.get("embedding_dim") or (len(vectors[0]) if vectors else None), + ) + + +class ClassifierService: + def __init__(self, embed_url: str, *, timeout_s: float = 30.0, max_batch_size: int = DEFAULT_MAX_BATCH_SIZE) -> None: + self.embed_url = embed_url + self.client = EmbeddingClient(embed_url, timeout_s=timeout_s) + self.max_batch_size = max(1, int(max_batch_size)) + self.loaded_at = time.time() + self.prototype_texts: list[str] = [] + self.prototype_keys: list[str] = [] + for key, examples in PROTOTYPES.items(): + for example in examples: + self.prototype_keys.append(key) + self.prototype_texts.append(example) + self.prototype_vectors: list[list[float]] | None = None + self.prototype_npu_busy_delta_us: int | None = None + self.embedding_dim: int | None = None + self.warnings: list[str] = [] + + def warmup(self) -> None: + result = self.client.embed(self.prototype_texts, purpose="document") + self.prototype_vectors = result.vectors + self.prototype_npu_busy_delta_us = result.npu_busy_delta_us + self.embedding_dim = result.embedding_dim + if not result.npu_busy_delta_us or result.npu_busy_delta_us <= 0: + self.warnings.append("prototype embedding warmup did not report positive NPU busy delta") + + def health(self) -> dict[str, Any]: + return { + "status": "ok" if self.prototype_vectors else "starting", + "service": SERVICE, + "version": VERSION, + "mode": "dry_run", + "model": MODEL, + "embed_url": self.embed_url, + "device": "NPU-via-embedding-service", + "labels": ["tool_needed", "memory_candidate", "urgency", "workflow_category", "safety_confirmation_required"], + "embedding_dim": self.embedding_dim, + "prototype_count": len(self.prototype_texts), + "max_batch_size": self.max_batch_size, + "prototype_npu_busy_delta_us": self.prototype_npu_busy_delta_us, + "npu_busy_time_us": npu_busy_time_us(), + "uptime_s": round(time.time() - self.loaded_at, 3), + "warnings": self.warnings, + } + + def labels(self) -> dict[str, Any]: + return { + "model": MODEL, + "thresholds": { + "tool_needed": 0.72, + "memory_candidate": 0.78, + "safety_confirmation_required": 0.80, + "workflow_category": 0.52, + }, + "enums": {"memory_candidate": MEMORY_VALUES, "urgency": URGENCY_VALUES, "workflow_category": WORKFLOW_CATEGORIES}, + "limits": {"max_batch_size": self.max_batch_size}, + "prototype_ids": sorted(PROTOTYPES), + } + + def classify(self, item_id: str | None, text: str, options: dict[str, Any] | None = None) -> dict[str, Any]: + if self.prototype_vectors is None: + self.warmup() + options = options or {} + include_evidence = bool(options.get("include_evidence", True)) + include_embedding_debug = bool(options.get("include_embedding_debug", False)) + dry_run = bool(options.get("dry_run", True)) + started = time.perf_counter() + text = str(text or "") + if not text.strip(): + raise ValueError("text must be a non-empty string") + + sysfs_before = npu_busy_time_us() + embedded = self.client.embed([text], purpose="query") + sysfs_after = npu_busy_time_us() + if not embedded.vectors: + raise RuntimeError("embedding service returned no vectors") + message_vec = embedded.vectors[0] + similarities = self._prototype_scores(message_vec) + + evidence: list[dict[str, Any]] = [] + labels: dict[str, Any] = {} + + tool_rule, tool_codes, tool_evidence = best_rule(text, "tool_needed") + tool_proto = max([similarities.get("tool_needed", 0.0)], default=0.0) + # Similarity alone is too broad for action classification; require either + # a deterministic rule hit or a very strong prototype match. + tool_conf = round(max(tool_rule, tool_proto if tool_proto >= 0.88 else 0.0), 3) + labels["tool_needed"] = {"value": tool_conf >= 0.72, "confidence": tool_conf, "threshold": 0.72, "reason_codes": tool_codes} + evidence.extend(tool_evidence) + if tool_proto > 0: + evidence.append({"label": "tool_needed", "source": "prototype_similarity", "prototype": "tool_needed", "score": round(tool_proto, 3)}) + + mem_label, mem_conf, mem_codes, mem_ev = self._memory_label(text, similarities) + labels["memory_candidate"] = {"value": mem_label, "confidence": round(mem_conf, 3), "threshold": 0.78, "reason_codes": mem_codes} + evidence.extend(mem_ev) + + urgency_value, urgency_conf, urgency_scores, urgency_codes, urgency_ev = self._urgency_label(text, similarities) + labels["urgency"] = {"value": urgency_value, "confidence": round(urgency_conf, 3), "scores": {k: round(v, 3) for k, v in urgency_scores.items()}, "reason_codes": urgency_codes} + evidence.extend(urgency_ev) + + workflow_value, workflow_conf, workflow_scores, workflow_ev = self._workflow_label(similarities, text) + labels["workflow_category"] = {"value": workflow_value, "confidence": round(workflow_conf, 3), "scores": {k: round(v, 3) for k, v in workflow_scores.items()}} + evidence.extend(workflow_ev) + + safety_rule, safety_codes, safety_evidence = best_rule(text, "safety") + safety_proto = 0.0 + safety_conf = round(max(safety_rule, safety_proto), 3) + labels["safety_confirmation_required"] = {"value": safety_conf >= 0.80, "confidence": safety_conf, "threshold": 0.80, "reason_codes": safety_codes} + evidence.extend(safety_evidence) + + npu_delta = embedded.npu_busy_delta_us + sysfs_delta = None if sysfs_before is None or sysfs_after is None else sysfs_after - sysfs_before + warnings = list(self.warnings) + if not npu_delta or npu_delta <= 0: + warnings.append("embedding call did not report positive npu_busy_delta_us; NPU execution not proven for this request") + if sysfs_delta is not None and sysfs_delta <= 0: + warnings.append("sysfs npu_busy_time_us did not increase during classification request") + + response: dict[str, Any] = { + "id": item_id, + "model": MODEL, + "created": int(time.time()), + "duration_ms": round((time.perf_counter() - started) * 1000, 3), + "npu_busy_delta_us": npu_delta, + "sysfs_npu_busy_delta_us": sysfs_delta, + "dry_run": dry_run, + "labels": labels, + "warnings": warnings, + } + if include_evidence: + response["evidence"] = evidence[:30] + if include_embedding_debug: + response["embedding_debug"] = {"embedding_dim": len(message_vec), "prototype_scores": {k: round(v, 3) for k, v in similarities.items()}} + return response + + def batch_classify(self, items: list[dict[str, Any]], options: dict[str, Any] | None = None) -> dict[str, Any]: + if not items: + raise ValueError("items must contain at least one classification request") + if len(items) > self.max_batch_size: + raise ValueError(f"items exceeds max_batch_size={self.max_batch_size}") + started = time.perf_counter() + results = [self.classify(item.get("id"), str(item.get("text") or ""), options) for item in items] + return { + "model": MODEL, + "duration_ms": round((time.perf_counter() - started) * 1000, 3), + "npu_busy_delta_us": sum((r.get("npu_busy_delta_us") or 0) for r in results), + "results": results, + } + + def _prototype_scores(self, vec: list[float]) -> dict[str, float]: + assert self.prototype_vectors is not None + scores: dict[str, float] = {} + for key, prototype_vec in zip(self.prototype_keys, self.prototype_vectors): + scores[key] = max(scores.get(key, 0.0), cosine(vec, prototype_vec)) + return scores + + def _memory_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, list[str], list[dict[str, Any]]]: + rule_score, codes, evidence = best_rule(text, "memory") + candidates = { + "user_preference": scores.get("memory_user_preference", 0.0), + "durable_user_fact": scores.get("memory_durable_user_fact", 0.0), + "environment_fact": scores.get("memory_environment_fact", 0.0), + "workflow_convention": scores.get("memory_workflow_convention", 0.0), + "skill_candidate": scores.get("memory_skill_candidate", 0.0), + } + label, proto_score = max(candidates.items(), key=lambda kv: kv[1]) + confidence = max(proto_score, rule_score) + explicit_memory = rule_score >= 0.78 + durable_fact_hint = bool(re.search(r"\b(project uses|repo uses|environment uses|runs on|standard practice|convention|workflow convention)\b", text, re.I)) + if explicit_memory: + if re.search(r"\b(prefer|preference|call me|my name|I live|I am)\b", text, re.I): + label = "user_preference" if re.search(r"\b(prefer|preference)\b", text, re.I) else "durable_user_fact" + elif durable_fact_hint: + label = "environment_fact" + elif re.search(r"\b(skill|procedure|workflow)\b", text, re.I): + label = "skill_candidate" + # BGE prototype similarities are advisory but broad; avoid recommending + # memory writes from similarity alone unless the text also has durable- + # fact language or an unusually strong prototype match. + if confidence < 0.78 or (not explicit_memory and not durable_fact_hint and proto_score < 0.88): + label = "none" + else: + evidence.append({"label": "memory_candidate", "source": "prototype_similarity", "prototype": f"memory_{label}", "score": round(proto_score, 3)}) + return label, confidence if label != "none" else max(0.0, min(confidence, 0.77)), codes, evidence + + def _urgency_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, dict[str, float], list[str], list[dict[str, Any]]]: + high_rule, high_codes, high_ev = best_rule(text, "urgency_high") + critical_rule, critical_codes, critical_ev = best_rule(text, "urgency_critical") + low_rule = 0.82 if re.search(r"\b(no rush|whenever convenient|low priority|someday|backlog)\b", text, re.I) else 0.0 + # Urgency is safety-sensitive for notifications, so require explicit + # language instead of relying on broad prototype similarity. + score_map = { + # Urgency should be explicit; broad embedding similarity otherwise + # turns neutral requests such as "what time is it" into low/high/critical urgency. + "low": low_rule, + "normal": 0.68, + "high": high_rule, + "critical": critical_rule, + } + if score_map["critical"] >= 0.9: + score_map["normal"] = 0.05 + elif score_map["high"] >= 0.8 or score_map["low"] >= 0.8: + score_map["normal"] = 0.2 + value, confidence = max(score_map.items(), key=lambda kv: kv[1]) + evidence = high_ev + critical_ev + return value, confidence, score_map, sorted(set(high_codes + critical_codes)), evidence + + def _workflow_label(self, scores: dict[str, float], text: str = "") -> tuple[str, float, dict[str, float], list[dict[str, Any]]]: + score_map = {category: scores.get(f"workflow_{category}", 0.0) for category in WORKFLOW_CATEGORIES if category != "unknown"} + rule_patterns: list[tuple[str, str]] = [ + ("chat", r"\bwhat time is it|what date is it|general question\b"), + ("kanban", r"\bkanban|task card|review-required|blocked\b"), + ("smart_home", r"\blights?|thermostat|home assistant|hue|wiz\b"), + ("media", r"\btranscribe|voice memo|audio|video|image|spotify|youtube\b"), + ("research", r"\bresearch|compare sources|papers?|literature|web search\b"), + ("devops", r"\bsystemd|docker|kubernetes|service|ports?|gateway|deploy|infrastructure\b"), + ("debugging", r"\bdebug|failing|traceback|logs?|reproduce|diagnose\b"), + ("coding", r"\bimplement|code|pytest|refactor|feature|PR\b"), + ("note_taking", r"\bobsidian|notes?|memory|diary|chroma|reindex\b"), + ("productivity", r"\bcalendar|email|spreadsheet|presentation|notion|airtable|linear\b"), + ] + rule_value: str | None = None + for category, pattern in rule_patterns: + if re.search(pattern, text, re.I): + rule_value = category + break + if rule_value: + value = rule_value + confidence = max(0.86, score_map.get(rule_value, 0.0)) + score_map[rule_value] = confidence + source = "rule" + else: + value, confidence = max(score_map.items(), key=lambda kv: kv[1]) + source = "prototype_similarity" + if confidence < 0.52: + value = "unknown" + confidence = 0.52 + score_map["unknown"] = 1.0 - confidence if value != "unknown" else confidence + evidence = [{"label": "workflow_category", "source": source, "prototype": f"workflow_{value}", "score": round(confidence, 3)}] + return value, confidence, score_map, evidence + + +class Handler(BaseHTTPRequestHandler): + server_version = "AtlasRouterClassifier/0.1" + + @property + def svc(self) -> ClassifierService: + return self.server.classifier_service # type: ignore[attr-defined] + + def do_GET(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + if path in {"/", "/healthz", "/readyz", "/health"}: + self.write_json(self.svc.health()) + elif path == "/v1/labels": + self.write_json(self.svc.labels()) + else: + self.write_json({"error": "not found"}, status=404) + + def do_POST(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + try: + payload = self.read_json() + options = payload.get("options") if isinstance(payload.get("options"), dict) else {} + if path == "/v1/classify": + self.write_json(self.svc.classify(payload.get("id"), str(payload.get("text") or ""), options)) + elif path == "/v1/batch_classify": + items = payload.get("items") + if not isinstance(items, list): + raise ValueError("items must be a list") + self.write_json(self.svc.batch_classify(items, options)) + else: + self.write_json({"error": "not found"}, status=404) + except ValueError as exc: + self.write_json({"error": str(exc)}, status=400) + except Exception as exc: + self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500) + + def read_json(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length") or 0) + body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}" + payload = json.loads(body or "{}") + if not isinstance(payload, dict): + raise ValueError("JSON body must be an object") + return payload + + def write_json(self, payload: dict[str, Any], status: int = 200) -> None: + body = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name + print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Dry-run Atlas/Hermes router classifier") + parser.add_argument("--host", default=os.environ.get("OPENVINO_CLASSIFIER_HOST", DEFAULT_HOST)) + parser.add_argument("--port", type=int, default=env_int("OPENVINO_CLASSIFIER_PORT", DEFAULT_PORT)) + parser.add_argument("--embed-url", default=os.environ.get("OPENVINO_CLASSIFIER_EMBED_URL", DEFAULT_EMBED_URL)) + parser.add_argument("--timeout-s", type=float, default=env_float("OPENVINO_CLASSIFIER_TIMEOUT_S", 30.0)) + parser.add_argument("--max-batch-size", type=int, default=env_int("OPENVINO_CLASSIFIER_MAX_BATCH_SIZE", DEFAULT_MAX_BATCH_SIZE)) + parser.add_argument("--no-warmup", action="store_true", help="skip prototype embedding warmup until first request") + args = parser.parse_args() + + service = ClassifierService(args.embed_url, timeout_s=args.timeout_s, max_batch_size=args.max_batch_size) + if not args.no_warmup: + service.warmup() + httpd = ThreadingHTTPServer((args.host, args.port), Handler) + httpd.classifier_service = service # type: ignore[attr-defined] + print(f"{SERVICE} listening on {args.host}:{args.port} embed_url={args.embed_url} mode=dry_run", flush=True) + try: + httpd.serve_forever() + except KeyboardInterrupt: + pass + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/openvino-classifier-npu/smoke_classifier.py b/openvino-classifier-npu/smoke_classifier.py new file mode 100644 index 0000000..4f3eb41 --- /dev/null +++ b/openvino-classifier-npu/smoke_classifier.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +"""Local-only smoke test for the dry-run OpenVINO router classifier. + +This script uses only synthetic fixture messages. It assumes router_classifier.py is +already running on localhost and never installs/enables a persistent service. +""" +from __future__ import annotations + +import argparse +import json +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path +from typing import Any + +DEFAULT_BASE_URL = "http://127.0.0.1:18819" +BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") +FIXTURE = Path(__file__).resolve().parent / "fixtures" / "atlas_hermes_messages.jsonl" + + +def npu_busy_time_us() -> int | None: + try: + return int(BUSY_FILE.read_text().strip()) + except Exception: + return None + + +def get_json(url: str, timeout_s: float) -> dict[str, Any]: + with urllib.request.urlopen(url, timeout=timeout_s) as response: # noqa: S310 - localhost smoke URL + return json.loads(response.read().decode("utf-8")) + + +def post_json(url: str, payload: dict[str, Any], timeout_s: float) -> dict[str, Any]: + request = urllib.request.Request( + url, + data=json.dumps(payload).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(request, timeout=timeout_s) as response: # noqa: S310 - localhost smoke URL + return json.loads(response.read().decode("utf-8")) + + +def load_fixture(limit: int) -> list[dict[str, Any]]: + rows = [json.loads(line) for line in FIXTURE.read_text().splitlines() if line.strip()] + return rows[:limit] + + +def assert_expected(result: dict[str, Any], expected: dict[str, Any]) -> list[str]: + failures: list[str] = [] + labels = result.get("labels", {}) + for key, value in expected.items(): + actual_label = labels.get(key, {}) + actual_value = actual_label.get("value") + if actual_value != value: + failures.append(f"{result.get('id')}: {key} expected {value!r}, got {actual_value!r}") + return failures + + +def main() -> int: + parser = argparse.ArgumentParser(description="Smoke-test a running localhost router classifier") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL) + parser.add_argument("--timeout-s", type=float, default=30.0) + parser.add_argument("--limit", type=int, default=10) + args = parser.parse_args() + + if not args.base_url.startswith("http://127.0.0.1:") and not args.base_url.startswith("http://localhost:"): + raise SystemExit("refusing non-local base URL; this smoke is localhost-only") + + before = npu_busy_time_us() + started = time.perf_counter() + try: + health = get_json(f"{args.base_url.rstrip('/')}/healthz", args.timeout_s) + labels = get_json(f"{args.base_url.rstrip('/')}/v1/labels", args.timeout_s) + rows = load_fixture(args.limit) + results = [] + failures: list[str] = [] + for row in rows: + result = post_json( + f"{args.base_url.rstrip('/')}/v1/classify", + {"id": row["id"], "text": row["text"], "options": {"include_evidence": False, "dry_run": True}}, + args.timeout_s, + ) + results.append(result) + failures.extend(assert_expected(result, row.get("expected", {}))) + after = npu_busy_time_us() + except urllib.error.URLError as exc: + raise SystemExit(f"smoke failed: {exc}") from exc + + response_npu_delta = sum((r.get("npu_busy_delta_us") or 0) for r in results) + outer_sysfs_delta = None if before is None or after is None else after - before + npu_proven = response_npu_delta > 0 and (outer_sysfs_delta is None or outer_sysfs_delta > 0) + summary = { + "ok": not failures, + "service": health.get("service"), + "mode": health.get("mode"), + "model": health.get("model"), + "label_count": len(labels.get("prototype_ids", [])), + "fixture_count": len(results), + "duration_ms": round((time.perf_counter() - started) * 1000, 3), + "response_npu_busy_delta_us": response_npu_delta, + "outer_sysfs_npu_busy_delta_us": outer_sysfs_delta, + "npu_proven": npu_proven, + "failures": failures, + } + print(json.dumps(summary, indent=2, sort_keys=True)) + return 0 if not failures and npu_proven else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/openvino-classifier-npu/tests/test_router_classifier.py b/openvino-classifier-npu/tests/test_router_classifier.py new file mode 100644 index 0000000..5c25ccd --- /dev/null +++ b/openvino-classifier-npu/tests/test_router_classifier.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import importlib.util +import json +import sys +import unittest +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +MODULE_PATH = ROOT / "router_classifier.py" +spec = importlib.util.spec_from_file_location("router_classifier", MODULE_PATH) +assert spec and spec.loader +router_classifier = importlib.util.module_from_spec(spec) +sys.modules["router_classifier"] = router_classifier +spec.loader.exec_module(router_classifier) + + +class FakeClient: + def embed(self, texts, *, purpose="query"): + # Deterministic toy embeddings based on keyword buckets. The tests focus on + # rule safety and API shape; live smoke tests cover the real NPU upstream. + vectors = [] + for text in texts: + t = text.lower() + vec = [0.0] * 8 + if any(w in t for w in ["time", "current", "weather", "news", "port", "git", "logs", "systemd"]): + vec[0] = 1.0 + if any(w in t for w in ["remember", "prefer", "preference"]): + vec[1] = 1.0 + if any(w in t for w in ["urgent", "down", "outage", "critical"]): + vec[2] = 1.0 + if any(w in t for w in ["code", "pytest", "debug", "git", "diff"]): + vec[3] = 1.0 + if any(w in t for w in ["service", "systemd", "port", "gateway", "docker"]): + vec[4] = 1.0 + if any(w in t for w in ["kanban", "task", "blocked", "review"]): + vec[5] = 1.0 + if any(w in t for w in ["light", "thermostat"]): + vec[6] = 1.0 + if any(w in t for w in ["transcribe", "voice", "memo", "audio"]): + vec[7] = 1.0 + if not any(vec): + vec[0] = 0.2 + vectors.append(vec) + return router_classifier.EmbedResult(vectors=vectors, npu_busy_delta_us=123, duration_ms=1.0, embedding_dim=8) + + +class RouterClassifierTests(unittest.TestCase): + def service(self): + svc = router_classifier.ClassifierService("http://fake.local/v1/embeddings") + svc.client = FakeClient() + svc.warmup() + return svc + + def test_health_and_label_schema(self): + svc = self.service() + health = svc.health() + self.assertEqual(health["service"], "atlas-router-classifier") + self.assertEqual(health["mode"], "dry_run") + self.assertIn("tool_needed", health["labels"]) + labels = svc.labels() + self.assertIn("workflow_category", labels["enums"]) + self.assertIn("safety_confirmation_required", labels["thresholds"]) + + def test_explicit_preference_is_memory_candidate(self): + result = self.service().classify("pref", "Remember that I prefer concise terminal replies.") + self.assertEqual(result["labels"]["memory_candidate"]["value"], "user_preference") + self.assertGreaterEqual(result["labels"]["memory_candidate"]["confidence"], 0.78) + self.assertFalse(result["labels"]["safety_confirmation_required"]["value"]) + + def test_current_local_state_needs_tool(self): + result = self.service().classify("port", "Check whether port 18819 is listening and inspect systemd logs.") + self.assertTrue(result["labels"]["tool_needed"]["value"]) + self.assertIn("local_state_requested", result["labels"]["tool_needed"]["reason_codes"]) + + def test_live_gateway_restart_requires_confirmation(self): + result = self.service().classify("safe", "Restart the live Atlas gateway and switch primary routing.") + self.assertTrue(result["labels"]["safety_confirmation_required"]["value"]) + self.assertIn("live_service_or_routing_change", result["labels"]["safety_confirmation_required"]["reason_codes"]) + + def test_batch_shape(self): + result = self.service().batch_classify([ + {"id": "a", "text": "What time is it?"}, + {"id": "b", "text": "Delete the existing collection and reindex it in place."}, + ]) + self.assertEqual(result["model"], router_classifier.MODEL) + self.assertEqual(len(result["results"]), 2) + self.assertGreater(result["npu_busy_delta_us"], 0) + + def test_batch_limits_are_enforced(self): + svc = self.service() + with self.assertRaisesRegex(ValueError, "at least one"): + svc.batch_classify([]) + too_many = [{"id": str(i), "text": "What time is it?"} for i in range(router_classifier.DEFAULT_MAX_BATCH_SIZE + 1)] + with self.assertRaisesRegex(ValueError, "max_batch_size"): + svc.batch_classify(too_many) + + def test_fixture_file_is_valid_jsonl(self): + fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl" + rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()] + self.assertGreaterEqual(len(rows), 8) + for row in rows: + self.assertIn("id", row) + self.assertIn("text", row) + self.assertIn("expected", row) + + def test_synthetic_fixture_expectations(self): + svc = self.service() + fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl" + rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()] + for row in rows: + with self.subTest(row=row["id"]): + result = svc.classify(row["id"], row["text"], {"include_evidence": False}) + labels = result["labels"] + for label_name, expected_value in row["expected"].items(): + self.assertEqual(labels[label_name]["value"], expected_value) + + +if __name__ == "__main__": + unittest.main()