feat(npu): add dry-run classifier router prototype

This commit is contained in:
William Valentin
2026-06-04 13:07:51 -07:00
parent 0683253157
commit ea452886f3
7 changed files with 1305 additions and 0 deletions
+339
View File
@@ -0,0 +1,339 @@
# OpenVINO NPU classifier/router dry-run contract
Status: specification for dry-run prototype refresh
Target port: `127.0.0.1:18819`
Owner context: Atlas/Hermes local assistant sidecar evaluation
This service is an advisory classifier for Atlas/Hermes automation hints. It may suggest labels such as tool-needed, memory-candidate type, urgency, workflow category, and safety-confirmation-required, but it must not make or enforce live routing, memory, tool, or safety decisions without a separate explicit approval from Will.
## Recommended model and runtime
Recommended v1 runtime: small local Python HTTP/CLI service backed by the existing OpenVINO NPU embeddings service on `127.0.0.1:18817`.
Recommended v1 model shape:
- Primary signal: `bge-base-en-v1.5-int8-ov` embeddings from the live embeddings service.
- Classifier layer: inspectable deterministic rules plus cosine similarity against curated synthetic/prototype utterances.
- Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0`.
- Device proof: request-level `npu_busy_delta_us` from `:18817` plus direct sysfs before/after reads from `/sys/class/accel/accel0/device/npu_busy_time_us`.
Why this is preferred for the dry run:
1. It reuses the already-live NPU embeddings path rather than adding a second model conversion/runtime dependency before contract validation.
2. Rules and prototypes are transparent enough for safety-sensitive routing hints; a reviewer can inspect why a message was labeled.
3. It avoids fine-tuning or training on private Atlas/Hermes transcripts.
4. It keeps the service small, localhost-only, and easy to start/stop during smoke tests.
5. It produces NPU activity through the embeddings path while making clear that final decision logic remains advisory.
Defer a dedicated NPU sequence-classification model such as TinyBERT/MiniLM until the dry-run labels and thresholds have been evaluated against synthetic fixtures and explicitly-approved non-private examples. If pursued later, use OpenVINO Runtime/Optimum export with fixed input shapes suitable for NPU, and keep the rule layer for safety gates.
## Non-goals and safety invariants
The service must not:
- Change Hermes/Atlas model routing, gateway routing, memory writes, tool-use permissions, or safety-confirmation behavior.
- Restart, stop, enable, or persist any live Atlas/Hermes/gateway/RAG service.
- Bind to anything broader than `127.0.0.1` by default.
- Mutate Chroma/vector collections, trigger reindexing, or write to RAG state.
- Process private document/image directories or private transcript dumps for smoke testing.
- Log raw prompts by default beyond normal foreground stderr during local review.
- Claim NPU success from HTTP 200 alone.
## Endpoint contract
All HTTP endpoints are local-only by default.
Base URL:
```text
http://127.0.0.1:18819
```
### GET `/healthz`, `/health`, `/readyz`, `/`
Purpose: liveness/readiness metadata.
Response fields:
- `status`: `starting | ok`
- `service`: `atlas-router-classifier`
- `version`: service version string
- `mode`: always `dry_run`
- `model`: model/runtime label
- `embed_url`: upstream embeddings URL
- `device`: expected to say `NPU-via-embedding-service` or equivalent
- `labels`: supported label names
- `embedding_dim`: embedding dimension after warmup
- `prototype_count`: number of synthetic prototype examples loaded
- `prototype_npu_busy_delta_us`: warmup delta reported by upstream embeddings, if available
- `npu_busy_time_us`: current sysfs counter value, if readable
- `warnings`: list of non-fatal warnings
A healthy service is not enough to prove NPU execution. At least one classification request must also show positive request and sysfs busy deltas.
### GET `/v1/labels`
Purpose: publish schema information without dumping private examples.
Response fields:
- `model`
- `thresholds`
- `tool_needed`: recommended threshold `0.72`
- `memory_candidate`: recommended threshold `0.78`
- `safety_confirmation_required`: recommended threshold `0.80`
- `workflow_category`: recommended threshold `0.52`
- `enums`
- `memory_candidate`: `none`, `user_preference`, `durable_user_fact`, `environment_fact`, `workflow_convention`, `skill_candidate`
- `urgency`: `low`, `normal`, `high`, `critical`
- `workflow_category`: `chat`, `research`, `coding`, `debugging`, `devops`, `smart_home`, `media`, `note_taking`, `productivity`, `kanban`, `unknown`
- `prototype_ids`: names of curated synthetic prototype buckets
### POST `/v1/classify`
Purpose: classify one user/task message for advisory dry-run hints.
Request:
```json
{
"id": "optional-trace-id",
"text": "Urgent: check whether port 18817 is listening and inspect systemd logs.",
"context": {
"platform": "cli",
"source": "user"
},
"options": {
"include_evidence": true,
"include_embedding_debug": false,
"dry_run": true
}
}
```
Required behavior:
- Reject empty text with HTTP 400.
- Default `dry_run` to true.
- Return no side effects other than local inference and response generation.
- Include evidence by default unless `include_evidence=false`.
- Include embedding/prototype scores only when explicitly requested through `include_embedding_debug=true`.
Response:
```json
{
"id": "optional-trace-id",
"model": "bge-base-en-v1.5-int8-ov/prototype-router-v0",
"created": 1780590000,
"duration_ms": 12.3,
"npu_busy_delta_us": 1234,
"sysfs_npu_busy_delta_us": 1200,
"dry_run": true,
"labels": {
"tool_needed": {
"value": true,
"confidence": 0.84,
"threshold": 0.72,
"reason_codes": ["local_state_requested"]
},
"memory_candidate": {
"value": "none",
"confidence": 0.31,
"threshold": 0.78,
"reason_codes": []
},
"urgency": {
"value": "high",
"confidence": 0.84,
"scores": {"low": 0.0, "normal": 0.2, "high": 0.84, "critical": 0.0},
"reason_codes": ["urgent_language"]
},
"workflow_category": {
"value": "devops",
"confidence": 0.86,
"scores": {"devops": 0.86, "unknown": 0.14}
},
"safety_confirmation_required": {
"value": false,
"confidence": 0.0,
"threshold": 0.8,
"reason_codes": []
}
},
"warnings": [],
"evidence": []
}
```
### POST `/v1/batch_classify`
Purpose: classify a bounded batch of non-private synthetic or explicitly-approved messages.
Request:
```json
{
"items": [
{"id": "m1", "text": "What time is it in Seattle right now?"},
{"id": "m2", "text": "Restart the live Atlas gateway and switch primary routing."}
],
"options": {"include_evidence": false, "dry_run": true}
}
```
Response:
- `model`
- `duration_ms`
- aggregate `npu_busy_delta_us`
- `results`: array of `/v1/classify` responses
Batch limits for prototype review:
- Keep batches small; the prototype rejects empty batches and batches larger than `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE` (default `32`).
- Use only synthetic fixtures unless Will explicitly approves a real non-private sample set.
- Do not retain request bodies to disk.
## CLI contract
The same implementation should support foreground review from the service directory:
```bash
cd /home/will/lab/swarm/openvino-classifier-npu
/home/will/.venvs/npu/bin/python router_classifier.py \
--host 127.0.0.1 \
--port 18819 \
--embed-url http://127.0.0.1:18817/v1/embeddings
```
Required flags/env:
- `--host` / `OPENVINO_CLASSIFIER_HOST`; default `127.0.0.1`.
- `--port` / `OPENVINO_CLASSIFIER_PORT`; default `18819`.
- `--embed-url` / `OPENVINO_CLASSIFIER_EMBED_URL`; default `http://127.0.0.1:18817/v1/embeddings`.
- `--timeout-s` / `OPENVINO_CLASSIFIER_TIMEOUT_S`; default `30`.
- `--max-batch-size` / `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`; default `32`.
- `--no-warmup` to defer prototype embedding until first request.
A future dedicated CLI mode may be added for one-shot JSONL classification, but foreground HTTP review is sufficient for the dry-run contract.
## Synthetic smoke-test plan
Preconditions:
1. Confirm `:18817` embeddings service is healthy.
2. Confirm `:18819` is not already listening.
3. Read `/sys/class/accel/accel0/device/npu_busy_time_us` before starting the request smoke.
4. Use only synthetic fixture text such as `fixtures/atlas_hermes_messages.jsonl`.
Unit/schema smoke, no NPU dependency:
```bash
cd /home/will/lab/swarm
/home/will/.venvs/npu/bin/python -m unittest discover -s openvino-classifier-npu/tests -v
```
Foreground service smoke:
```bash
ss -ltnp | grep ':18819\b' || true
cd /home/will/lab/swarm/openvino-classifier-npu
/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819
```
From another shell:
```bash
curl -fsS http://127.0.0.1:18819/healthz | jq .
curl -fsS http://127.0.0.1:18819/v1/labels | jq .
curl -fsS http://127.0.0.1:18819/v1/classify \
-H 'Content-Type: application/json' \
-d '{"id":"smoke-devops","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true,"dry_run":true}}' | jq .
curl -fsS http://127.0.0.1:18819/v1/classify \
-H 'Content-Type: application/json' \
-d '{"id":"smoke-safety","text":"Restart the live Atlas gateway and switch primary routing to the new classifier.","options":{"include_evidence":true,"dry_run":true}}' | jq .
```
Expected label checks:
- `smoke-devops`: `tool_needed.value=true`, `urgency.value=high`, `workflow_category.value=devops`.
- `smoke-safety`: `safety_confirmation_required.value=true`, no actual restart or routing change.
- Health and classify responses include no raw private paths or private document content.
Shutdown:
- Stop the foreground server with Ctrl-C.
- Re-run `ss -ltnp | grep ':18819\b' || true` and confirm no listener remains.
## NPU busy-time verification plan
Use sysfs plus service response fields; do not accept HTTP 200 alone.
```bash
BUSY=/sys/class/accel/accel0/device/npu_busy_time_us
before=$(cat "$BUSY")
response=$(curl -fsS http://127.0.0.1:18819/v1/classify \
-H 'Content-Type: application/json' \
-d '{"id":"npu-proof","text":"Check current systemd service status for the embeddings service.","options":{"include_evidence":false,"dry_run":true}}')
after=$(cat "$BUSY")
echo "$response" | jq '{npu_busy_delta_us, sysfs_npu_busy_delta_us, warnings}'
echo "outer_sysfs_npu_busy_delta_us=$((after-before))"
```
Optional localhost smoke helper, after starting the foreground service:
```bash
/home/will/.venvs/npu/bin/python openvino-classifier-npu/smoke_classifier.py \
--base-url http://127.0.0.1:18819
```
Acceptance for an NPU-backed classification request:
- HTTP request succeeds.
- Response `npu_busy_delta_us > 0` from upstream embeddings.
- Response `sysfs_npu_busy_delta_us > 0` when sysfs is readable.
- Outer shell `after-before > 0`.
- If any delta is missing or <= 0, mark NPU proof failed or inconclusive and do not claim NPU execution.
## Docs and diagram implications
If this prototype is refreshed or reviewed, update documentation to show:
- Live baseline remains RAG `:18810`, RAG health `:18814`, Whisper NPU `:18816`, and embeddings `:18817`.
- Classifier/router `:18819` is an optional prototype sidecar, not a live Atlas/Hermes routing dependency.
- Any architecture diagram should place `:18819` under local AI/search/voice prototype sidecars with a clear `dry-run / not live routing` label.
- Runbooks should list foreground start, health/classify smoke, sysfs NPU proof, and shutdown checks.
- Service catalog entries should state `not installed/enabled` until Will approves persistent service enablement.
- No docs should imply the classifier decides memory writes, tool permission, safety confirmation, or live routing.
Relevant docs inventory:
- `docs/swarm-infrastructure.md`
- `docs/swarm-infrastructure.html`
- `docs/diagram-maintenance.md`
- `swarm-common/obsidian-vault/will/will-shared-zap/Runbooks/OpenVINO NPU Services Runbook.md`
- `swarm-common/obsidian-vault/will/will-shared-zap/Resources/Service Catalog.md`
## No-go / defer criteria
Do not proceed to implementation refresh, persistent service enablement, or live integration if any of the following hold:
- `:18817` embeddings is unavailable and no approved NPU embedding fallback exists.
- `/sys/class/accel/accel0/device/npu_busy_time_us` is missing/unreadable and NPU proof cannot be independently established.
- Classification responses cannot produce positive NPU busy-time deltas.
- `:18819` is already occupied by an unknown or live service.
- Smoke tests require private transcripts, private document/image directories, or production routing changes.
- Labels are too noisy on synthetic fixtures to be useful as advisory hints.
- The service would need to bind externally, run persistently, or integrate with live Hermes/Atlas before Will approves those gates.
- Any implementation path requires mutating Chroma/vector collections or triggering RAG reindexing in place.
## Implementation handoff notes
Recommended next engineer actions:
1. Verify or refresh `openvino-classifier-npu/router_classifier.py` to match this contract.
2. Keep the service stdlib/local-first unless a dependency is already present in `/home/will/.venvs/npu`.
3. Maintain synthetic fixtures and unit tests for label schema/threshold behavior.
4. Run only foreground smokes; do not install or enable `openvino-router-classifier.service`.
5. Capture changed files, unit test output, listener checks, response samples, and NPU busy-time before/after in the implementation handoff.
+141
View File
@@ -0,0 +1,141 @@
# OpenVINO NPU router classifier prototype
Dry-run Atlas/Hermes message classifier/router prototype.
The detailed dry-run contract is in [`CONTRACT.md`](./CONTRACT.md), including the
recommended model/runtime, HTTP/CLI schema, smoke-test plan, NPU busy-time proof,
docs/diagram implications, and no-go/defer criteria.
It reuses the existing OpenVINO NPU embeddings service on `127.0.0.1:18817` and
serves an inspectable stdlib HTTP API on `127.0.0.1:18819`. It does not change
live Hermes/Atlas routing, write memory, mutate vector collections, restart
services, or send external messages.
## Runtime shape
- Service: `atlas-router-classifier`
- Default port: `18819`
- Default bind: `127.0.0.1`
- Upstream: `http://127.0.0.1:18817/v1/embeddings`
- Batch limit: `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`, default `32`
- Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0`
- NPU proof: `/sys/class/accel/accel0/device/npu_busy_time_us` before/after plus upstream `npu_busy_delta_us`
The classifier uses deterministic high-precision rules for safety/urgency/tool
signals plus cosine similarity against curated embedding prototypes for workflow
and memory recommendations. This is intentionally tunable without model training.
## API
### GET `/healthz`
Returns service metadata, labels, prototype count, NPU sysfs counter, and warmup
NPU delta.
### GET `/v1/labels`
Returns label enum values, thresholds, and prototype IDs without dumping private
fixtures.
### POST `/v1/classify`
Request:
```json
{
"id": "optional trace id",
"text": "User message or task body to classify.",
"context": {"platform": "cli", "source": "user"},
"options": {
"include_evidence": true,
"include_embedding_debug": false,
"dry_run": true
}
}
```
Response includes:
- `labels.tool_needed`: boolean, confidence, threshold, reason codes
- `labels.memory_candidate`: `none | user_preference | durable_user_fact | environment_fact | workflow_convention | skill_candidate`
- `labels.urgency`: `low | normal | high | critical`
- `labels.workflow_category`: `chat | research | coding | debugging | devops | smart_home | media | note_taking | productivity | kanban | unknown`
- `labels.safety_confirmation_required`: boolean, confidence, reason codes
- `npu_busy_delta_us` and `sysfs_npu_busy_delta_us`
- `evidence` when requested
### POST `/v1/batch_classify`
Request:
```json
{
"items": [{"id": "m1", "text": "What time is it?"}],
"options": {"include_evidence": false, "dry_run": true}
}
```
## Local smoke test
Check that the proposed port is free first:
```bash
ss -ltnp | grep ':18819' || true
```
Run without installing anything extra; `/home/will/.venvs/npu` already has the
stdlib plus requests/openvino stack used by the upstream embeddings service:
```bash
cd /home/will/lab/swarm/openvino-classifier-npu
/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819
```
Environment variables mirror the flags: `OPENVINO_CLASSIFIER_HOST`,
`OPENVINO_CLASSIFIER_PORT`, `OPENVINO_CLASSIFIER_EMBED_URL`,
`OPENVINO_CLASSIFIER_TIMEOUT_S`, and `OPENVINO_CLASSIFIER_MAX_BATCH_SIZE`.
Then from another shell:
```bash
curl -fsS http://127.0.0.1:18819/healthz | jq .
curl -fsS http://127.0.0.1:18819/v1/classify \
-H 'Content-Type: application/json' \
-d '{"id":"smoke","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true}}' | jq .
```
A valid NPU-backed response must have positive `npu_busy_delta_us`; HTTP 200 by
itself is not considered proof.
Synthetic fixture smoke helper, after the foreground service is running:
```bash
/home/will/.venvs/npu/bin/python smoke_classifier.py --base-url http://127.0.0.1:18819
```
The helper refuses non-local URLs, checks fixture label expectations, and prints
response plus outer sysfs NPU busy deltas.
## Tests
Unit tests use a fake embedding client and do not touch the NPU:
```bash
/home/will/.venvs/npu/bin/python -m unittest discover -s openvino-classifier-npu/tests -v
```
Fixture messages live at `fixtures/atlas_hermes_messages.jsonl`.
## Optional systemd user unit
A draft unit is included as `openvino-router-classifier.service`. Install only
after review/approval:
```bash
cp openvino-router-classifier.service ~/.config/systemd/user/openvino-router-classifier.service
systemctl --user daemon-reload
systemctl --user start openvino-router-classifier.service
systemctl --user status openvino-router-classifier.service --no-pager
```
Do not enable it at boot or connect it to live Atlas/Hermes routing as part of this prototype task without explicit approval. Keep classifier decisions dry-run until a separate approved routing change lands.
@@ -0,0 +1,10 @@
{"id":"tool-time","text":"What time is it in Seattle right now?","expected":{"tool_needed":true,"workflow_category":"chat","urgency":"normal","safety_confirmation_required":false}}
{"id":"memory-preference","text":"Remember that I prefer concise answers in the terminal.","expected":{"memory_candidate":"user_preference","tool_needed":false,"safety_confirmation_required":false}}
{"id":"coding-debug","text":"Debug the failing pytest suite and inspect the git diff before opening a PR.","expected":{"tool_needed":true,"workflow_category":"debugging","urgency":"normal"}}
{"id":"devops-urgent","text":"Urgent: the embeddings service on port 18817 is down; check systemd logs and restore it.","expected":{"tool_needed":true,"workflow_category":"devops","urgency":"high"}}
{"id":"safety-routing","text":"Restart the live Atlas gateway and switch primary routing to the new classifier.","expected":{"tool_needed":true,"workflow_category":"devops","safety_confirmation_required":true}}
{"id":"destructive-reindex","text":"Delete the existing Chroma collection and reindex the Obsidian vault in place.","expected":{"tool_needed":true,"workflow_category":"note_taking","safety_confirmation_required":true}}
{"id":"research","text":"Research current OpenVINO NPU support for TinyBERT sequence classification and summarize sources.","expected":{"tool_needed":true,"workflow_category":"research"}}
{"id":"smart-home","text":"Turn off the living room lights and set the thermostat to 68.","expected":{"tool_needed":true,"workflow_category":"smart_home"}}
{"id":"media","text":"Transcribe this voice memo and extract action items.","expected":{"tool_needed":true,"workflow_category":"media"}}
{"id":"kanban","text":"Work kanban task t_5e123496 and block it if review is required.","expected":{"tool_needed":true,"workflow_category":"kanban"}}
@@ -0,0 +1,18 @@
[Unit]
Description=Atlas/Hermes dry-run OpenVINO router classifier
After=network.target openvino-embeddings.service
Wants=openvino-embeddings.service
[Service]
Type=simple
WorkingDirectory=/home/will/lab/swarm/openvino-classifier-npu
Environment=OPENVINO_CLASSIFIER_HOST=127.0.0.1
Environment=OPENVINO_CLASSIFIER_PORT=18819
Environment=OPENVINO_CLASSIFIER_EMBED_URL=http://127.0.0.1:18817/v1/embeddings
Environment=OPENVINO_CLASSIFIER_MAX_BATCH_SIZE=32
ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-classifier-npu/router_classifier.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=default.target
@@ -0,0 +1,563 @@
#!/usr/bin/env python3
"""Dry-run Atlas/Hermes router classifier backed by the local OpenVINO NPU embedding service.
Default port: 18819
Default upstream: http://127.0.0.1:18817/v1/embeddings
This service is intentionally advisory only. It does not write memory, mutate routing,
restart services, or call external APIs. NPU execution is proved by the upstream
embedding service's npu_busy_delta_us and by reading the local sysfs busy counter.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import re
import sys
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
VERSION = "0.1.0"
SERVICE = "atlas-router-classifier"
MODEL = "bge-base-en-v1.5-int8-ov/prototype-router-v0"
DEFAULT_HOST = "127.0.0.1"
DEFAULT_PORT = 18819
DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings"
DEFAULT_MAX_BATCH_SIZE = 32
NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
WORKFLOW_CATEGORIES = [
"chat",
"research",
"coding",
"debugging",
"devops",
"smart_home",
"media",
"note_taking",
"productivity",
"kanban",
"unknown",
]
MEMORY_VALUES = ["none", "user_preference", "durable_user_fact", "environment_fact", "workflow_convention", "skill_candidate"]
URGENCY_VALUES = ["low", "normal", "high", "critical"]
PROTOTYPES: dict[str, list[str]] = {
"tool_needed": [
"check the current date time weather news versions or live facts",
"inspect files git branches logs ports processes disk memory or system state",
"send a message create a cron job call an API or interact with a local service",
"search the web browse a website download or verify current information",
],
"memory_user_preference": [
"remember that I prefer concise replies and a direct style",
"my preference is use short answers and avoid unnecessary detail",
"please remember I like this convention for future sessions",
],
"memory_durable_user_fact": [
"remember that I live in Seattle and work on local AI infrastructure",
"my name role location identity or durable personal detail is",
],
"memory_environment_fact": [
"this project uses pytest and this server runs linux with openvino npu",
"remember this repository convention service port path or environment setup",
],
"memory_workflow_convention": [
"for this workflow use this recurring procedure convention or process",
"the team convention is to run checks before code review and use a worktree",
],
"memory_skill_candidate": [
"we discovered a reusable multi step workflow that should become a skill",
"save this procedure as a reusable skill after solving a tricky task",
],
"urgency_low": [
"whenever convenient no rush low priority idea someday backlog",
],
"urgency_high": [
"urgent asap high priority today please handle soon production issue",
"service is degraded broken failing down users are blocked",
],
"urgency_critical": [
"critical outage security incident data loss production down emergency now",
"stop the bleeding rollback immediately credentials leaked destructive incident",
],
"workflow_chat": [
"answer a general question explain a concept brainstorm rewrite text chat casually",
],
"workflow_research": [
"research compare summarize sources papers market docs web search literature review",
],
"workflow_coding": [
"implement code write tests refactor add feature fix type errors create a branch",
],
"workflow_debugging": [
"debug failing tests inspect logs reproduce error traceback diagnose regression",
],
"workflow_devops": [
"operate services systemd docker kubernetes ports health checks deploy infrastructure",
],
"workflow_smart_home": [
"turn on lights adjust thermostat control tv speaker home assistant hue wiz",
],
"workflow_media": [
"transcribe audio process video image gif spotify music youtube media file",
],
"workflow_note_taking": [
"obsidian notes daily diary memory knowledge base document personal context",
],
"workflow_productivity": [
"calendar email spreadsheet presentation notion airtable linear task planning",
],
"workflow_kanban": [
"kanban task board card assignee handoff review required blocked complete worker",
],
}
RULES: dict[str, list[tuple[re.Pattern[str], str, float]]] = {
"tool_needed": [
(re.compile(r"\b(current|today|now|latest|weather|news|version|price|stock)\b", re.I), "current_fact_requested", 0.88),
(re.compile(r"\b(file|directory|git|branch|commit|diff|log|port|process|disk|memory|cpu|gpu|npu|service|systemd|reindex)\b", re.I), "local_state_requested", 0.84),
(re.compile(r"\b(send|schedule|create cron|call api|download|browse|search web|open website|turn on|turn off|set the thermostat|transcribe|restart|switch primary routing|work kanban|kanban task)\b", re.I), "external_or_tool_action_requested", 0.86),
],
"safety": [
(re.compile(r"\b(delete|remove|overwrite|drop|truncate|wipe|reindex|reset --hard|force push)\b", re.I), "destructive_or_irreversible_action", 0.92),
(re.compile(r"\b(restart|stop|deploy|expose|public|0\.0\.0\.0|route live|primary routing|gateway)\b", re.I), "live_service_or_routing_change", 0.88),
(re.compile(r"\b(secret|token|api key|credential|password|private document|external upload|send message|spend money|purchase)\b", re.I), "credential_privacy_or_external_side_effect", 0.9),
],
"memory": [
(re.compile(r"\b(remember that|please remember|don'?t forget|my preference|I prefer|call me)\b", re.I), "explicit_memory_language", 0.9),
(re.compile(r"\b(always|for future|going forward|convention|workflow|standard practice)\b", re.I), "durable_convention_language", 0.78),
],
"urgency_high": [
(re.compile(r"\b(urgent|asap|immediately|high priority|production|down|broken|blocked)\b", re.I), "urgent_language", 0.84),
],
"urgency_critical": [
(re.compile(r"\b(critical|emergency|outage|data loss|credential leak|security incident|prod down)\b", re.I), "critical_incident_language", 0.94),
],
}
def npu_busy_time_us() -> int | None:
try:
return int(NPU_BUSY_FILE.read_text().strip())
except Exception:
return None
def env_int(name: str, default: int) -> int:
raw = os.environ.get(name)
if raw is None:
return default
try:
return int(raw)
except ValueError as exc:
raise SystemExit(f"{name} must be an integer, got {raw!r}") from exc
def env_float(name: str, default: float) -> float:
raw = os.environ.get(name)
if raw is None:
return default
try:
return float(raw)
except ValueError as exc:
raise SystemExit(f"{name} must be a number, got {raw!r}") from exc
def clamp01(value: float) -> float:
return max(0.0, min(1.0, value))
def cosine(a: list[float], b: list[float]) -> float:
if not a or not b or len(a) != len(b):
return 0.0
dot = sum(x * y for x, y in zip(a, b))
na = math.sqrt(sum(x * x for x in a))
nb = math.sqrt(sum(y * y for y in b))
if na == 0.0 or nb == 0.0:
return 0.0
# Map [-1, 1] to [0, 1] for confidence-like scoring.
return clamp01((dot / (na * nb) + 1.0) / 2.0)
def best_rule(text: str, group: str) -> tuple[float, list[str], list[dict[str, Any]]]:
best = 0.0
codes: list[str] = []
evidence: list[dict[str, Any]] = []
for pattern, code, score in RULES.get(group, []):
match = pattern.search(text)
if match:
best = max(best, score)
codes.append(code)
evidence.append({"label": group, "source": "rule", "matched": match.group(0), "reason_code": code, "score": score})
return best, sorted(set(codes)), evidence
@dataclass
class EmbedResult:
vectors: list[list[float]]
npu_busy_delta_us: int | None
duration_ms: float
embedding_dim: int | None
class EmbeddingClient:
def __init__(self, url: str, timeout_s: float = 30.0) -> None:
self.url = url
self.timeout_s = timeout_s
def embed(self, texts: list[str], *, purpose: str = "query") -> EmbedResult:
payload = json.dumps({"input": texts, "purpose": purpose}).encode("utf-8")
request = urllib.request.Request(
self.url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
started = time.perf_counter()
try:
with urllib.request.urlopen(request, timeout=self.timeout_s) as response: # noqa: S310 - local configured URL
body = response.read().decode("utf-8", "replace")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", "replace")
raise RuntimeError(f"embedding service HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise RuntimeError(f"embedding service unavailable at {self.url}: {exc.reason}") from exc
data = json.loads(body)
vectors = [item["embedding"] for item in data.get("data", [])]
return EmbedResult(
vectors=[[float(x) for x in vec] for vec in vectors],
npu_busy_delta_us=data.get("npu_busy_delta_us"),
duration_ms=round((time.perf_counter() - started) * 1000, 3),
embedding_dim=data.get("embedding_dim") or (len(vectors[0]) if vectors else None),
)
class ClassifierService:
def __init__(self, embed_url: str, *, timeout_s: float = 30.0, max_batch_size: int = DEFAULT_MAX_BATCH_SIZE) -> None:
self.embed_url = embed_url
self.client = EmbeddingClient(embed_url, timeout_s=timeout_s)
self.max_batch_size = max(1, int(max_batch_size))
self.loaded_at = time.time()
self.prototype_texts: list[str] = []
self.prototype_keys: list[str] = []
for key, examples in PROTOTYPES.items():
for example in examples:
self.prototype_keys.append(key)
self.prototype_texts.append(example)
self.prototype_vectors: list[list[float]] | None = None
self.prototype_npu_busy_delta_us: int | None = None
self.embedding_dim: int | None = None
self.warnings: list[str] = []
def warmup(self) -> None:
result = self.client.embed(self.prototype_texts, purpose="document")
self.prototype_vectors = result.vectors
self.prototype_npu_busy_delta_us = result.npu_busy_delta_us
self.embedding_dim = result.embedding_dim
if not result.npu_busy_delta_us or result.npu_busy_delta_us <= 0:
self.warnings.append("prototype embedding warmup did not report positive NPU busy delta")
def health(self) -> dict[str, Any]:
return {
"status": "ok" if self.prototype_vectors else "starting",
"service": SERVICE,
"version": VERSION,
"mode": "dry_run",
"model": MODEL,
"embed_url": self.embed_url,
"device": "NPU-via-embedding-service",
"labels": ["tool_needed", "memory_candidate", "urgency", "workflow_category", "safety_confirmation_required"],
"embedding_dim": self.embedding_dim,
"prototype_count": len(self.prototype_texts),
"max_batch_size": self.max_batch_size,
"prototype_npu_busy_delta_us": self.prototype_npu_busy_delta_us,
"npu_busy_time_us": npu_busy_time_us(),
"uptime_s": round(time.time() - self.loaded_at, 3),
"warnings": self.warnings,
}
def labels(self) -> dict[str, Any]:
return {
"model": MODEL,
"thresholds": {
"tool_needed": 0.72,
"memory_candidate": 0.78,
"safety_confirmation_required": 0.80,
"workflow_category": 0.52,
},
"enums": {"memory_candidate": MEMORY_VALUES, "urgency": URGENCY_VALUES, "workflow_category": WORKFLOW_CATEGORIES},
"limits": {"max_batch_size": self.max_batch_size},
"prototype_ids": sorted(PROTOTYPES),
}
def classify(self, item_id: str | None, text: str, options: dict[str, Any] | None = None) -> dict[str, Any]:
if self.prototype_vectors is None:
self.warmup()
options = options or {}
include_evidence = bool(options.get("include_evidence", True))
include_embedding_debug = bool(options.get("include_embedding_debug", False))
dry_run = bool(options.get("dry_run", True))
started = time.perf_counter()
text = str(text or "")
if not text.strip():
raise ValueError("text must be a non-empty string")
sysfs_before = npu_busy_time_us()
embedded = self.client.embed([text], purpose="query")
sysfs_after = npu_busy_time_us()
if not embedded.vectors:
raise RuntimeError("embedding service returned no vectors")
message_vec = embedded.vectors[0]
similarities = self._prototype_scores(message_vec)
evidence: list[dict[str, Any]] = []
labels: dict[str, Any] = {}
tool_rule, tool_codes, tool_evidence = best_rule(text, "tool_needed")
tool_proto = max([similarities.get("tool_needed", 0.0)], default=0.0)
# Similarity alone is too broad for action classification; require either
# a deterministic rule hit or a very strong prototype match.
tool_conf = round(max(tool_rule, tool_proto if tool_proto >= 0.88 else 0.0), 3)
labels["tool_needed"] = {"value": tool_conf >= 0.72, "confidence": tool_conf, "threshold": 0.72, "reason_codes": tool_codes}
evidence.extend(tool_evidence)
if tool_proto > 0:
evidence.append({"label": "tool_needed", "source": "prototype_similarity", "prototype": "tool_needed", "score": round(tool_proto, 3)})
mem_label, mem_conf, mem_codes, mem_ev = self._memory_label(text, similarities)
labels["memory_candidate"] = {"value": mem_label, "confidence": round(mem_conf, 3), "threshold": 0.78, "reason_codes": mem_codes}
evidence.extend(mem_ev)
urgency_value, urgency_conf, urgency_scores, urgency_codes, urgency_ev = self._urgency_label(text, similarities)
labels["urgency"] = {"value": urgency_value, "confidence": round(urgency_conf, 3), "scores": {k: round(v, 3) for k, v in urgency_scores.items()}, "reason_codes": urgency_codes}
evidence.extend(urgency_ev)
workflow_value, workflow_conf, workflow_scores, workflow_ev = self._workflow_label(similarities, text)
labels["workflow_category"] = {"value": workflow_value, "confidence": round(workflow_conf, 3), "scores": {k: round(v, 3) for k, v in workflow_scores.items()}}
evidence.extend(workflow_ev)
safety_rule, safety_codes, safety_evidence = best_rule(text, "safety")
safety_proto = 0.0
safety_conf = round(max(safety_rule, safety_proto), 3)
labels["safety_confirmation_required"] = {"value": safety_conf >= 0.80, "confidence": safety_conf, "threshold": 0.80, "reason_codes": safety_codes}
evidence.extend(safety_evidence)
npu_delta = embedded.npu_busy_delta_us
sysfs_delta = None if sysfs_before is None or sysfs_after is None else sysfs_after - sysfs_before
warnings = list(self.warnings)
if not npu_delta or npu_delta <= 0:
warnings.append("embedding call did not report positive npu_busy_delta_us; NPU execution not proven for this request")
if sysfs_delta is not None and sysfs_delta <= 0:
warnings.append("sysfs npu_busy_time_us did not increase during classification request")
response: dict[str, Any] = {
"id": item_id,
"model": MODEL,
"created": int(time.time()),
"duration_ms": round((time.perf_counter() - started) * 1000, 3),
"npu_busy_delta_us": npu_delta,
"sysfs_npu_busy_delta_us": sysfs_delta,
"dry_run": dry_run,
"labels": labels,
"warnings": warnings,
}
if include_evidence:
response["evidence"] = evidence[:30]
if include_embedding_debug:
response["embedding_debug"] = {"embedding_dim": len(message_vec), "prototype_scores": {k: round(v, 3) for k, v in similarities.items()}}
return response
def batch_classify(self, items: list[dict[str, Any]], options: dict[str, Any] | None = None) -> dict[str, Any]:
if not items:
raise ValueError("items must contain at least one classification request")
if len(items) > self.max_batch_size:
raise ValueError(f"items exceeds max_batch_size={self.max_batch_size}")
started = time.perf_counter()
results = [self.classify(item.get("id"), str(item.get("text") or ""), options) for item in items]
return {
"model": MODEL,
"duration_ms": round((time.perf_counter() - started) * 1000, 3),
"npu_busy_delta_us": sum((r.get("npu_busy_delta_us") or 0) for r in results),
"results": results,
}
def _prototype_scores(self, vec: list[float]) -> dict[str, float]:
assert self.prototype_vectors is not None
scores: dict[str, float] = {}
for key, prototype_vec in zip(self.prototype_keys, self.prototype_vectors):
scores[key] = max(scores.get(key, 0.0), cosine(vec, prototype_vec))
return scores
def _memory_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, list[str], list[dict[str, Any]]]:
rule_score, codes, evidence = best_rule(text, "memory")
candidates = {
"user_preference": scores.get("memory_user_preference", 0.0),
"durable_user_fact": scores.get("memory_durable_user_fact", 0.0),
"environment_fact": scores.get("memory_environment_fact", 0.0),
"workflow_convention": scores.get("memory_workflow_convention", 0.0),
"skill_candidate": scores.get("memory_skill_candidate", 0.0),
}
label, proto_score = max(candidates.items(), key=lambda kv: kv[1])
confidence = max(proto_score, rule_score)
explicit_memory = rule_score >= 0.78
durable_fact_hint = bool(re.search(r"\b(project uses|repo uses|environment uses|runs on|standard practice|convention|workflow convention)\b", text, re.I))
if explicit_memory:
if re.search(r"\b(prefer|preference|call me|my name|I live|I am)\b", text, re.I):
label = "user_preference" if re.search(r"\b(prefer|preference)\b", text, re.I) else "durable_user_fact"
elif durable_fact_hint:
label = "environment_fact"
elif re.search(r"\b(skill|procedure|workflow)\b", text, re.I):
label = "skill_candidate"
# BGE prototype similarities are advisory but broad; avoid recommending
# memory writes from similarity alone unless the text also has durable-
# fact language or an unusually strong prototype match.
if confidence < 0.78 or (not explicit_memory and not durable_fact_hint and proto_score < 0.88):
label = "none"
else:
evidence.append({"label": "memory_candidate", "source": "prototype_similarity", "prototype": f"memory_{label}", "score": round(proto_score, 3)})
return label, confidence if label != "none" else max(0.0, min(confidence, 0.77)), codes, evidence
def _urgency_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, dict[str, float], list[str], list[dict[str, Any]]]:
high_rule, high_codes, high_ev = best_rule(text, "urgency_high")
critical_rule, critical_codes, critical_ev = best_rule(text, "urgency_critical")
low_rule = 0.82 if re.search(r"\b(no rush|whenever convenient|low priority|someday|backlog)\b", text, re.I) else 0.0
# Urgency is safety-sensitive for notifications, so require explicit
# language instead of relying on broad prototype similarity.
score_map = {
# Urgency should be explicit; broad embedding similarity otherwise
# turns neutral requests such as "what time is it" into low/high/critical urgency.
"low": low_rule,
"normal": 0.68,
"high": high_rule,
"critical": critical_rule,
}
if score_map["critical"] >= 0.9:
score_map["normal"] = 0.05
elif score_map["high"] >= 0.8 or score_map["low"] >= 0.8:
score_map["normal"] = 0.2
value, confidence = max(score_map.items(), key=lambda kv: kv[1])
evidence = high_ev + critical_ev
return value, confidence, score_map, sorted(set(high_codes + critical_codes)), evidence
def _workflow_label(self, scores: dict[str, float], text: str = "") -> tuple[str, float, dict[str, float], list[dict[str, Any]]]:
score_map = {category: scores.get(f"workflow_{category}", 0.0) for category in WORKFLOW_CATEGORIES if category != "unknown"}
rule_patterns: list[tuple[str, str]] = [
("chat", r"\bwhat time is it|what date is it|general question\b"),
("kanban", r"\bkanban|task card|review-required|blocked\b"),
("smart_home", r"\blights?|thermostat|home assistant|hue|wiz\b"),
("media", r"\btranscribe|voice memo|audio|video|image|spotify|youtube\b"),
("research", r"\bresearch|compare sources|papers?|literature|web search\b"),
("devops", r"\bsystemd|docker|kubernetes|service|ports?|gateway|deploy|infrastructure\b"),
("debugging", r"\bdebug|failing|traceback|logs?|reproduce|diagnose\b"),
("coding", r"\bimplement|code|pytest|refactor|feature|PR\b"),
("note_taking", r"\bobsidian|notes?|memory|diary|chroma|reindex\b"),
("productivity", r"\bcalendar|email|spreadsheet|presentation|notion|airtable|linear\b"),
]
rule_value: str | None = None
for category, pattern in rule_patterns:
if re.search(pattern, text, re.I):
rule_value = category
break
if rule_value:
value = rule_value
confidence = max(0.86, score_map.get(rule_value, 0.0))
score_map[rule_value] = confidence
source = "rule"
else:
value, confidence = max(score_map.items(), key=lambda kv: kv[1])
source = "prototype_similarity"
if confidence < 0.52:
value = "unknown"
confidence = 0.52
score_map["unknown"] = 1.0 - confidence if value != "unknown" else confidence
evidence = [{"label": "workflow_category", "source": source, "prototype": f"workflow_{value}", "score": round(confidence, 3)}]
return value, confidence, score_map, evidence
class Handler(BaseHTTPRequestHandler):
server_version = "AtlasRouterClassifier/0.1"
@property
def svc(self) -> ClassifierService:
return self.server.classifier_service # type: ignore[attr-defined]
def do_GET(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
if path in {"/", "/healthz", "/readyz", "/health"}:
self.write_json(self.svc.health())
elif path == "/v1/labels":
self.write_json(self.svc.labels())
else:
self.write_json({"error": "not found"}, status=404)
def do_POST(self) -> None:
path = self.path.split("?", 1)[0].rstrip("/") or "/"
try:
payload = self.read_json()
options = payload.get("options") if isinstance(payload.get("options"), dict) else {}
if path == "/v1/classify":
self.write_json(self.svc.classify(payload.get("id"), str(payload.get("text") or ""), options))
elif path == "/v1/batch_classify":
items = payload.get("items")
if not isinstance(items, list):
raise ValueError("items must be a list")
self.write_json(self.svc.batch_classify(items, options))
else:
self.write_json({"error": "not found"}, status=404)
except ValueError as exc:
self.write_json({"error": str(exc)}, status=400)
except Exception as exc:
self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500)
def read_json(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length") or 0)
body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}"
payload = json.loads(body or "{}")
if not isinstance(payload, dict):
raise ValueError("JSON body must be an object")
return payload
def write_json(self, payload: dict[str, Any], status: int = 200) -> None:
body = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name
print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True)
def main() -> int:
parser = argparse.ArgumentParser(description="Dry-run Atlas/Hermes router classifier")
parser.add_argument("--host", default=os.environ.get("OPENVINO_CLASSIFIER_HOST", DEFAULT_HOST))
parser.add_argument("--port", type=int, default=env_int("OPENVINO_CLASSIFIER_PORT", DEFAULT_PORT))
parser.add_argument("--embed-url", default=os.environ.get("OPENVINO_CLASSIFIER_EMBED_URL", DEFAULT_EMBED_URL))
parser.add_argument("--timeout-s", type=float, default=env_float("OPENVINO_CLASSIFIER_TIMEOUT_S", 30.0))
parser.add_argument("--max-batch-size", type=int, default=env_int("OPENVINO_CLASSIFIER_MAX_BATCH_SIZE", DEFAULT_MAX_BATCH_SIZE))
parser.add_argument("--no-warmup", action="store_true", help="skip prototype embedding warmup until first request")
args = parser.parse_args()
service = ClassifierService(args.embed_url, timeout_s=args.timeout_s, max_batch_size=args.max_batch_size)
if not args.no_warmup:
service.warmup()
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.classifier_service = service # type: ignore[attr-defined]
print(f"{SERVICE} listening on {args.host}:{args.port} embed_url={args.embed_url} mode=dry_run", flush=True)
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
return 0
if __name__ == "__main__":
raise SystemExit(main())
+113
View File
@@ -0,0 +1,113 @@
#!/usr/bin/env python3
"""Local-only smoke test for the dry-run OpenVINO router classifier.
This script uses only synthetic fixture messages. It assumes router_classifier.py is
already running on localhost and never installs/enables a persistent service.
"""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
DEFAULT_BASE_URL = "http://127.0.0.1:18819"
BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
FIXTURE = Path(__file__).resolve().parent / "fixtures" / "atlas_hermes_messages.jsonl"
def npu_busy_time_us() -> int | None:
try:
return int(BUSY_FILE.read_text().strip())
except Exception:
return None
def get_json(url: str, timeout_s: float) -> dict[str, Any]:
with urllib.request.urlopen(url, timeout=timeout_s) as response: # noqa: S310 - localhost smoke URL
return json.loads(response.read().decode("utf-8"))
def post_json(url: str, payload: dict[str, Any], timeout_s: float) -> dict[str, Any]:
request = urllib.request.Request(
url,
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout_s) as response: # noqa: S310 - localhost smoke URL
return json.loads(response.read().decode("utf-8"))
def load_fixture(limit: int) -> list[dict[str, Any]]:
rows = [json.loads(line) for line in FIXTURE.read_text().splitlines() if line.strip()]
return rows[:limit]
def assert_expected(result: dict[str, Any], expected: dict[str, Any]) -> list[str]:
failures: list[str] = []
labels = result.get("labels", {})
for key, value in expected.items():
actual_label = labels.get(key, {})
actual_value = actual_label.get("value")
if actual_value != value:
failures.append(f"{result.get('id')}: {key} expected {value!r}, got {actual_value!r}")
return failures
def main() -> int:
parser = argparse.ArgumentParser(description="Smoke-test a running localhost router classifier")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--timeout-s", type=float, default=30.0)
parser.add_argument("--limit", type=int, default=10)
args = parser.parse_args()
if not args.base_url.startswith("http://127.0.0.1:") and not args.base_url.startswith("http://localhost:"):
raise SystemExit("refusing non-local base URL; this smoke is localhost-only")
before = npu_busy_time_us()
started = time.perf_counter()
try:
health = get_json(f"{args.base_url.rstrip('/')}/healthz", args.timeout_s)
labels = get_json(f"{args.base_url.rstrip('/')}/v1/labels", args.timeout_s)
rows = load_fixture(args.limit)
results = []
failures: list[str] = []
for row in rows:
result = post_json(
f"{args.base_url.rstrip('/')}/v1/classify",
{"id": row["id"], "text": row["text"], "options": {"include_evidence": False, "dry_run": True}},
args.timeout_s,
)
results.append(result)
failures.extend(assert_expected(result, row.get("expected", {})))
after = npu_busy_time_us()
except urllib.error.URLError as exc:
raise SystemExit(f"smoke failed: {exc}") from exc
response_npu_delta = sum((r.get("npu_busy_delta_us") or 0) for r in results)
outer_sysfs_delta = None if before is None or after is None else after - before
npu_proven = response_npu_delta > 0 and (outer_sysfs_delta is None or outer_sysfs_delta > 0)
summary = {
"ok": not failures,
"service": health.get("service"),
"mode": health.get("mode"),
"model": health.get("model"),
"label_count": len(labels.get("prototype_ids", [])),
"fixture_count": len(results),
"duration_ms": round((time.perf_counter() - started) * 1000, 3),
"response_npu_busy_delta_us": response_npu_delta,
"outer_sysfs_npu_busy_delta_us": outer_sysfs_delta,
"npu_proven": npu_proven,
"failures": failures,
}
print(json.dumps(summary, indent=2, sort_keys=True))
return 0 if not failures and npu_proven else 1
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,121 @@
#!/usr/bin/env python3
from __future__ import annotations
import importlib.util
import json
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
MODULE_PATH = ROOT / "router_classifier.py"
spec = importlib.util.spec_from_file_location("router_classifier", MODULE_PATH)
assert spec and spec.loader
router_classifier = importlib.util.module_from_spec(spec)
sys.modules["router_classifier"] = router_classifier
spec.loader.exec_module(router_classifier)
class FakeClient:
def embed(self, texts, *, purpose="query"):
# Deterministic toy embeddings based on keyword buckets. The tests focus on
# rule safety and API shape; live smoke tests cover the real NPU upstream.
vectors = []
for text in texts:
t = text.lower()
vec = [0.0] * 8
if any(w in t for w in ["time", "current", "weather", "news", "port", "git", "logs", "systemd"]):
vec[0] = 1.0
if any(w in t for w in ["remember", "prefer", "preference"]):
vec[1] = 1.0
if any(w in t for w in ["urgent", "down", "outage", "critical"]):
vec[2] = 1.0
if any(w in t for w in ["code", "pytest", "debug", "git", "diff"]):
vec[3] = 1.0
if any(w in t for w in ["service", "systemd", "port", "gateway", "docker"]):
vec[4] = 1.0
if any(w in t for w in ["kanban", "task", "blocked", "review"]):
vec[5] = 1.0
if any(w in t for w in ["light", "thermostat"]):
vec[6] = 1.0
if any(w in t for w in ["transcribe", "voice", "memo", "audio"]):
vec[7] = 1.0
if not any(vec):
vec[0] = 0.2
vectors.append(vec)
return router_classifier.EmbedResult(vectors=vectors, npu_busy_delta_us=123, duration_ms=1.0, embedding_dim=8)
class RouterClassifierTests(unittest.TestCase):
def service(self):
svc = router_classifier.ClassifierService("http://fake.local/v1/embeddings")
svc.client = FakeClient()
svc.warmup()
return svc
def test_health_and_label_schema(self):
svc = self.service()
health = svc.health()
self.assertEqual(health["service"], "atlas-router-classifier")
self.assertEqual(health["mode"], "dry_run")
self.assertIn("tool_needed", health["labels"])
labels = svc.labels()
self.assertIn("workflow_category", labels["enums"])
self.assertIn("safety_confirmation_required", labels["thresholds"])
def test_explicit_preference_is_memory_candidate(self):
result = self.service().classify("pref", "Remember that I prefer concise terminal replies.")
self.assertEqual(result["labels"]["memory_candidate"]["value"], "user_preference")
self.assertGreaterEqual(result["labels"]["memory_candidate"]["confidence"], 0.78)
self.assertFalse(result["labels"]["safety_confirmation_required"]["value"])
def test_current_local_state_needs_tool(self):
result = self.service().classify("port", "Check whether port 18819 is listening and inspect systemd logs.")
self.assertTrue(result["labels"]["tool_needed"]["value"])
self.assertIn("local_state_requested", result["labels"]["tool_needed"]["reason_codes"])
def test_live_gateway_restart_requires_confirmation(self):
result = self.service().classify("safe", "Restart the live Atlas gateway and switch primary routing.")
self.assertTrue(result["labels"]["safety_confirmation_required"]["value"])
self.assertIn("live_service_or_routing_change", result["labels"]["safety_confirmation_required"]["reason_codes"])
def test_batch_shape(self):
result = self.service().batch_classify([
{"id": "a", "text": "What time is it?"},
{"id": "b", "text": "Delete the existing collection and reindex it in place."},
])
self.assertEqual(result["model"], router_classifier.MODEL)
self.assertEqual(len(result["results"]), 2)
self.assertGreater(result["npu_busy_delta_us"], 0)
def test_batch_limits_are_enforced(self):
svc = self.service()
with self.assertRaisesRegex(ValueError, "at least one"):
svc.batch_classify([])
too_many = [{"id": str(i), "text": "What time is it?"} for i in range(router_classifier.DEFAULT_MAX_BATCH_SIZE + 1)]
with self.assertRaisesRegex(ValueError, "max_batch_size"):
svc.batch_classify(too_many)
def test_fixture_file_is_valid_jsonl(self):
fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl"
rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()]
self.assertGreaterEqual(len(rows), 8)
for row in rows:
self.assertIn("id", row)
self.assertIn("text", row)
self.assertIn("expected", row)
def test_synthetic_fixture_expectations(self):
svc = self.service()
fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl"
rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()]
for row in rows:
with self.subTest(row=row["id"]):
result = svc.classify(row["id"], row["text"], {"include_evidence": False})
labels = result["labels"]
for label_name, expected_value in row["expected"].items():
self.assertEqual(labels[label_name]["value"], expected_value)
if __name__ == "__main__":
unittest.main()