From 4003198ba9ef14e98e80742068e3f6ab41de3d74 Mon Sep 17 00:00:00 2001 From: Hermes Agent Date: Thu, 4 Jun 2026 10:56:34 -0700 Subject: [PATCH] feat: add OpenVINO router classifier prototype --- openvino-classifier-npu/README.md | 122 ++++ .../fixtures/atlas_hermes_messages.jsonl | 10 + .../openvino-router-classifier.service | 17 + openvino-classifier-npu/router_classifier.py | 532 ++++++++++++++++++ .../tests/test_router_classifier.py | 102 ++++ 5 files changed, 783 insertions(+) create mode 100644 openvino-classifier-npu/README.md create mode 100644 openvino-classifier-npu/fixtures/atlas_hermes_messages.jsonl create mode 100644 openvino-classifier-npu/openvino-router-classifier.service create mode 100644 openvino-classifier-npu/router_classifier.py create mode 100644 openvino-classifier-npu/tests/test_router_classifier.py diff --git a/openvino-classifier-npu/README.md b/openvino-classifier-npu/README.md new file mode 100644 index 0000000..1d42223 --- /dev/null +++ b/openvino-classifier-npu/README.md @@ -0,0 +1,122 @@ +# OpenVINO NPU router classifier prototype + +Dry-run Atlas/Hermes message classifier/router prototype. + +It reuses the existing OpenVINO NPU embeddings service on `127.0.0.1:18817` and +serves an inspectable stdlib HTTP API on `127.0.0.1:18819`. It does not change +live Hermes/Atlas routing, write memory, mutate vector collections, restart +services, or send external messages. + +## Runtime shape + +- Service: `atlas-router-classifier` +- Default port: `18819` +- Default bind: `127.0.0.1` +- Upstream: `http://127.0.0.1:18817/v1/embeddings` +- Model label: `bge-base-en-v1.5-int8-ov/prototype-router-v0` +- NPU proof: `/sys/class/accel/accel0/device/npu_busy_time_us` before/after plus upstream `npu_busy_delta_us` + +The classifier uses deterministic high-precision rules for safety/urgency/tool +signals plus cosine similarity against curated embedding prototypes for workflow +and memory recommendations. This is intentionally tunable without model training. + +## API + +### GET `/healthz` + +Returns service metadata, labels, prototype count, NPU sysfs counter, and warmup +NPU delta. + +### GET `/v1/labels` + +Returns label enum values, thresholds, and prototype IDs without dumping private +fixtures. + +### POST `/v1/classify` + +Request: + +```json +{ + "id": "optional trace id", + "text": "User message or task body to classify.", + "context": {"platform": "cli", "source": "user"}, + "options": { + "include_evidence": true, + "include_embedding_debug": false, + "dry_run": true + } +} +``` + +Response includes: + +- `labels.tool_needed`: boolean, confidence, threshold, reason codes +- `labels.memory_candidate`: `none | user_preference | durable_user_fact | environment_fact | workflow_convention | skill_candidate` +- `labels.urgency`: `low | normal | high | critical` +- `labels.workflow_category`: `chat | research | coding | debugging | devops | smart_home | media | note_taking | productivity | kanban | unknown` +- `labels.safety_confirmation_required`: boolean, confidence, reason codes +- `npu_busy_delta_us` and `sysfs_npu_busy_delta_us` +- `evidence` when requested + +### POST `/v1/batch_classify` + +Request: + +```json +{ + "items": [{"id": "m1", "text": "What time is it?"}], + "options": {"include_evidence": false, "dry_run": true} +} +``` + +## Local smoke test + +Check that the proposed port is free first: + +```bash +ss -ltnp | grep ':18819' || true +``` + +Run without installing anything extra; `/home/will/.venvs/npu` already has the +stdlib plus requests/openvino stack used by the upstream embeddings service: + +```bash +cd /home/will/lab/swarm/openvino-classifier-npu +/home/will/.venvs/npu/bin/python router_classifier.py --host 127.0.0.1 --port 18819 +``` + +Then from another shell: + +```bash +curl -fsS http://127.0.0.1:18819/healthz | jq . +curl -fsS http://127.0.0.1:18819/v1/classify \ + -H 'Content-Type: application/json' \ + -d '{"id":"smoke","text":"Urgent: check whether port 18817 is listening and inspect systemd logs.","options":{"include_evidence":true}}' | jq . +``` + +A valid NPU-backed response must have positive `npu_busy_delta_us`; HTTP 200 by +itself is not considered proof. + +## Tests + +Unit tests use a fake embedding client and do not touch the NPU: + +```bash +/home/will/.venvs/npu/bin/python -m unittest discover -s openvino-classifier-npu/tests -v +``` + +Fixture messages live at `fixtures/atlas_hermes_messages.jsonl`. + +## Optional systemd user unit + +A draft unit is included as `openvino-router-classifier.service`. Install only +after review/approval: + +```bash +cp openvino-router-classifier.service ~/.config/systemd/user/openvino-router-classifier.service +systemctl --user daemon-reload +systemctl --user enable --now openvino-router-classifier.service +``` + +Do not enable it as part of this prototype task without explicit approval. diff --git a/openvino-classifier-npu/fixtures/atlas_hermes_messages.jsonl b/openvino-classifier-npu/fixtures/atlas_hermes_messages.jsonl new file mode 100644 index 0000000..fd95e95 --- /dev/null +++ b/openvino-classifier-npu/fixtures/atlas_hermes_messages.jsonl @@ -0,0 +1,10 @@ +{"id":"tool-time","text":"What time is it in Seattle right now?","expected":{"tool_needed":true,"workflow_category":"chat","urgency":"normal","safety_confirmation_required":false}} +{"id":"memory-preference","text":"Remember that I prefer concise answers in the terminal.","expected":{"memory_candidate":"user_preference","tool_needed":false,"safety_confirmation_required":false}} +{"id":"coding-debug","text":"Debug the failing pytest suite and inspect the git diff before opening a PR.","expected":{"tool_needed":true,"workflow_category":"debugging","urgency":"normal"}} +{"id":"devops-urgent","text":"Urgent: the embeddings service on port 18817 is down; check systemd logs and restore it.","expected":{"tool_needed":true,"workflow_category":"devops","urgency":"high"}} +{"id":"safety-routing","text":"Restart the live Atlas gateway and switch primary routing to the new classifier.","expected":{"tool_needed":true,"workflow_category":"devops","safety_confirmation_required":true}} +{"id":"destructive-reindex","text":"Delete the existing Chroma collection and reindex the Obsidian vault in place.","expected":{"tool_needed":true,"workflow_category":"note_taking","safety_confirmation_required":true}} +{"id":"research","text":"Research current OpenVINO NPU support for TinyBERT sequence classification and summarize sources.","expected":{"tool_needed":true,"workflow_category":"research"}} +{"id":"smart-home","text":"Turn off the living room lights and set the thermostat to 68.","expected":{"tool_needed":true,"workflow_category":"smart_home"}} +{"id":"media","text":"Transcribe this voice memo and extract action items.","expected":{"tool_needed":true,"workflow_category":"media"}} +{"id":"kanban","text":"Work kanban task t_5e123496 and block it if review is required.","expected":{"tool_needed":true,"workflow_category":"kanban"}} diff --git a/openvino-classifier-npu/openvino-router-classifier.service b/openvino-classifier-npu/openvino-router-classifier.service new file mode 100644 index 0000000..e537d47 --- /dev/null +++ b/openvino-classifier-npu/openvino-router-classifier.service @@ -0,0 +1,17 @@ +[Unit] +Description=Atlas/Hermes dry-run OpenVINO router classifier +After=network.target openvino-embeddings.service +Wants=openvino-embeddings.service + +[Service] +Type=simple +WorkingDirectory=/home/will/lab/swarm/openvino-classifier-npu +Environment=OPENVINO_CLASSIFIER_HOST=127.0.0.1 +Environment=OPENVINO_CLASSIFIER_PORT=18819 +Environment=OPENVINO_CLASSIFIER_EMBED_URL=http://127.0.0.1:18817/v1/embeddings +ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-classifier-npu/router_classifier.py +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=default.target diff --git a/openvino-classifier-npu/router_classifier.py b/openvino-classifier-npu/router_classifier.py new file mode 100644 index 0000000..379cb29 --- /dev/null +++ b/openvino-classifier-npu/router_classifier.py @@ -0,0 +1,532 @@ +#!/usr/bin/env python3 +"""Dry-run Atlas/Hermes router classifier backed by the local OpenVINO NPU embedding service. + +Default port: 18819 +Default upstream: http://127.0.0.1:18817/v1/embeddings + +This service is intentionally advisory only. It does not write memory, mutate routing, +restart services, or call external APIs. NPU execution is proved by the upstream +embedding service's npu_busy_delta_us and by reading the local sysfs busy counter. +""" +from __future__ import annotations + +import argparse +import json +import math +import os +import re +import sys +import time +import urllib.error +import urllib.request +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any + +VERSION = "0.1.0" +SERVICE = "atlas-router-classifier" +MODEL = "bge-base-en-v1.5-int8-ov/prototype-router-v0" +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 18819 +DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings" +NPU_BUSY_FILE = Path("/sys/class/accel/accel0/device/npu_busy_time_us") + +WORKFLOW_CATEGORIES = [ + "chat", + "research", + "coding", + "debugging", + "devops", + "smart_home", + "media", + "note_taking", + "productivity", + "kanban", + "unknown", +] +MEMORY_VALUES = ["none", "user_preference", "durable_user_fact", "environment_fact", "workflow_convention", "skill_candidate"] +URGENCY_VALUES = ["low", "normal", "high", "critical"] + +PROTOTYPES: dict[str, list[str]] = { + "tool_needed": [ + "check the current date time weather news versions or live facts", + "inspect files git branches logs ports processes disk memory or system state", + "send a message create a cron job call an API or interact with a local service", + "search the web browse a website download or verify current information", + ], + "memory_user_preference": [ + "remember that I prefer concise replies and a direct style", + "my preference is use short answers and avoid unnecessary detail", + "please remember I like this convention for future sessions", + ], + "memory_durable_user_fact": [ + "remember that I live in Seattle and work on local AI infrastructure", + "my name role location identity or durable personal detail is", + ], + "memory_environment_fact": [ + "this project uses pytest and this server runs linux with openvino npu", + "remember this repository convention service port path or environment setup", + ], + "memory_workflow_convention": [ + "for this workflow use this recurring procedure convention or process", + "the team convention is to run checks before code review and use a worktree", + ], + "memory_skill_candidate": [ + "we discovered a reusable multi step workflow that should become a skill", + "save this procedure as a reusable skill after solving a tricky task", + ], + "urgency_low": [ + "whenever convenient no rush low priority idea someday backlog", + ], + "urgency_high": [ + "urgent asap high priority today please handle soon production issue", + "service is degraded broken failing down users are blocked", + ], + "urgency_critical": [ + "critical outage security incident data loss production down emergency now", + "stop the bleeding rollback immediately credentials leaked destructive incident", + ], + "workflow_chat": [ + "answer a general question explain a concept brainstorm rewrite text chat casually", + ], + "workflow_research": [ + "research compare summarize sources papers market docs web search literature review", + ], + "workflow_coding": [ + "implement code write tests refactor add feature fix type errors create a branch", + ], + "workflow_debugging": [ + "debug failing tests inspect logs reproduce error traceback diagnose regression", + ], + "workflow_devops": [ + "operate services systemd docker kubernetes ports health checks deploy infrastructure", + ], + "workflow_smart_home": [ + "turn on lights adjust thermostat control tv speaker home assistant hue wiz", + ], + "workflow_media": [ + "transcribe audio process video image gif spotify music youtube media file", + ], + "workflow_note_taking": [ + "obsidian notes daily diary memory knowledge base document personal context", + ], + "workflow_productivity": [ + "calendar email spreadsheet presentation notion airtable linear task planning", + ], + "workflow_kanban": [ + "kanban task board card assignee handoff review required blocked complete worker", + ], +} + +RULES: dict[str, list[tuple[re.Pattern[str], str, float]]] = { + "tool_needed": [ + (re.compile(r"\b(current|today|now|latest|weather|news|version|price|stock)\b", re.I), "current_fact_requested", 0.88), + (re.compile(r"\b(file|directory|git|branch|commit|diff|log|port|process|disk|memory|cpu|gpu|npu|service|systemd|reindex)\b", re.I), "local_state_requested", 0.84), + (re.compile(r"\b(send|schedule|create cron|call api|download|browse|search web|open website|turn on|turn off|set the thermostat|transcribe|restart|switch primary routing|work kanban|kanban task)\b", re.I), "external_or_tool_action_requested", 0.86), + ], + "safety": [ + (re.compile(r"\b(delete|remove|overwrite|drop|truncate|wipe|reindex|reset --hard|force push)\b", re.I), "destructive_or_irreversible_action", 0.92), + (re.compile(r"\b(restart|stop|deploy|expose|public|0\.0\.0\.0|route live|primary routing|gateway)\b", re.I), "live_service_or_routing_change", 0.88), + (re.compile(r"\b(secret|token|api key|credential|password|private document|external upload|send message|spend money|purchase)\b", re.I), "credential_privacy_or_external_side_effect", 0.9), + ], + "memory": [ + (re.compile(r"\b(remember that|please remember|don'?t forget|my preference|I prefer|call me)\b", re.I), "explicit_memory_language", 0.9), + (re.compile(r"\b(always|for future|going forward|convention|workflow|standard practice)\b", re.I), "durable_convention_language", 0.78), + ], + "urgency_high": [ + (re.compile(r"\b(urgent|asap|immediately|high priority|production|down|broken|blocked)\b", re.I), "urgent_language", 0.84), + ], + "urgency_critical": [ + (re.compile(r"\b(critical|emergency|outage|data loss|credential leak|security incident|prod down)\b", re.I), "critical_incident_language", 0.94), + ], +} + + +def npu_busy_time_us() -> int | None: + try: + return int(NPU_BUSY_FILE.read_text().strip()) + except Exception: + return None + + +def clamp01(value: float) -> float: + return max(0.0, min(1.0, value)) + + +def cosine(a: list[float], b: list[float]) -> float: + if not a or not b or len(a) != len(b): + return 0.0 + dot = sum(x * y for x, y in zip(a, b)) + na = math.sqrt(sum(x * x for x in a)) + nb = math.sqrt(sum(y * y for y in b)) + if na == 0.0 or nb == 0.0: + return 0.0 + # Map [-1, 1] to [0, 1] for confidence-like scoring. + return clamp01((dot / (na * nb) + 1.0) / 2.0) + + +def best_rule(text: str, group: str) -> tuple[float, list[str], list[dict[str, Any]]]: + best = 0.0 + codes: list[str] = [] + evidence: list[dict[str, Any]] = [] + for pattern, code, score in RULES.get(group, []): + match = pattern.search(text) + if match: + best = max(best, score) + codes.append(code) + evidence.append({"label": group, "source": "rule", "matched": match.group(0), "reason_code": code, "score": score}) + return best, sorted(set(codes)), evidence + + +@dataclass +class EmbedResult: + vectors: list[list[float]] + npu_busy_delta_us: int | None + duration_ms: float + embedding_dim: int | None + + +class EmbeddingClient: + def __init__(self, url: str, timeout_s: float = 30.0) -> None: + self.url = url + self.timeout_s = timeout_s + + def embed(self, texts: list[str], *, purpose: str = "query") -> EmbedResult: + payload = json.dumps({"input": texts, "purpose": purpose}).encode("utf-8") + request = urllib.request.Request( + self.url, + data=payload, + headers={"Content-Type": "application/json"}, + method="POST", + ) + started = time.perf_counter() + try: + with urllib.request.urlopen(request, timeout=self.timeout_s) as response: # noqa: S310 - local configured URL + body = response.read().decode("utf-8", "replace") + except urllib.error.HTTPError as exc: + detail = exc.read().decode("utf-8", "replace") + raise RuntimeError(f"embedding service HTTP {exc.code}: {detail}") from exc + except urllib.error.URLError as exc: + raise RuntimeError(f"embedding service unavailable at {self.url}: {exc.reason}") from exc + data = json.loads(body) + vectors = [item["embedding"] for item in data.get("data", [])] + return EmbedResult( + vectors=[[float(x) for x in vec] for vec in vectors], + npu_busy_delta_us=data.get("npu_busy_delta_us"), + duration_ms=round((time.perf_counter() - started) * 1000, 3), + embedding_dim=data.get("embedding_dim") or (len(vectors[0]) if vectors else None), + ) + + +class ClassifierService: + def __init__(self, embed_url: str, *, timeout_s: float = 30.0) -> None: + self.embed_url = embed_url + self.client = EmbeddingClient(embed_url, timeout_s=timeout_s) + self.loaded_at = time.time() + self.prototype_texts: list[str] = [] + self.prototype_keys: list[str] = [] + for key, examples in PROTOTYPES.items(): + for example in examples: + self.prototype_keys.append(key) + self.prototype_texts.append(example) + self.prototype_vectors: list[list[float]] | None = None + self.prototype_npu_busy_delta_us: int | None = None + self.embedding_dim: int | None = None + self.warnings: list[str] = [] + + def warmup(self) -> None: + result = self.client.embed(self.prototype_texts, purpose="document") + self.prototype_vectors = result.vectors + self.prototype_npu_busy_delta_us = result.npu_busy_delta_us + self.embedding_dim = result.embedding_dim + if not result.npu_busy_delta_us or result.npu_busy_delta_us <= 0: + self.warnings.append("prototype embedding warmup did not report positive NPU busy delta") + + def health(self) -> dict[str, Any]: + return { + "status": "ok" if self.prototype_vectors else "starting", + "service": SERVICE, + "version": VERSION, + "mode": "dry_run", + "model": MODEL, + "embed_url": self.embed_url, + "device": "NPU-via-embedding-service", + "labels": ["tool_needed", "memory_candidate", "urgency", "workflow_category", "safety_confirmation_required"], + "embedding_dim": self.embedding_dim, + "prototype_count": len(self.prototype_texts), + "prototype_npu_busy_delta_us": self.prototype_npu_busy_delta_us, + "npu_busy_time_us": npu_busy_time_us(), + "uptime_s": round(time.time() - self.loaded_at, 3), + "warnings": self.warnings, + } + + def labels(self) -> dict[str, Any]: + return { + "model": MODEL, + "thresholds": { + "tool_needed": 0.72, + "memory_candidate": 0.78, + "safety_confirmation_required": 0.80, + "workflow_category": 0.52, + }, + "enums": {"memory_candidate": MEMORY_VALUES, "urgency": URGENCY_VALUES, "workflow_category": WORKFLOW_CATEGORIES}, + "prototype_ids": sorted(PROTOTYPES), + } + + def classify(self, item_id: str | None, text: str, options: dict[str, Any] | None = None) -> dict[str, Any]: + if self.prototype_vectors is None: + self.warmup() + options = options or {} + include_evidence = bool(options.get("include_evidence", True)) + include_embedding_debug = bool(options.get("include_embedding_debug", False)) + dry_run = bool(options.get("dry_run", True)) + started = time.perf_counter() + text = str(text or "") + if not text.strip(): + raise ValueError("text must be a non-empty string") + + sysfs_before = npu_busy_time_us() + embedded = self.client.embed([text], purpose="query") + sysfs_after = npu_busy_time_us() + if not embedded.vectors: + raise RuntimeError("embedding service returned no vectors") + message_vec = embedded.vectors[0] + similarities = self._prototype_scores(message_vec) + + evidence: list[dict[str, Any]] = [] + labels: dict[str, Any] = {} + + tool_rule, tool_codes, tool_evidence = best_rule(text, "tool_needed") + tool_proto = max([similarities.get("tool_needed", 0.0)], default=0.0) + # Similarity alone is too broad for action classification; require either + # a deterministic rule hit or a very strong prototype match. + tool_conf = round(max(tool_rule, tool_proto if tool_proto >= 0.88 else 0.0), 3) + labels["tool_needed"] = {"value": tool_conf >= 0.72, "confidence": tool_conf, "threshold": 0.72, "reason_codes": tool_codes} + evidence.extend(tool_evidence) + if tool_proto > 0: + evidence.append({"label": "tool_needed", "source": "prototype_similarity", "prototype": "tool_needed", "score": round(tool_proto, 3)}) + + mem_label, mem_conf, mem_codes, mem_ev = self._memory_label(text, similarities) + labels["memory_candidate"] = {"value": mem_label, "confidence": round(mem_conf, 3), "threshold": 0.78, "reason_codes": mem_codes} + evidence.extend(mem_ev) + + urgency_value, urgency_conf, urgency_scores, urgency_codes, urgency_ev = self._urgency_label(text, similarities) + labels["urgency"] = {"value": urgency_value, "confidence": round(urgency_conf, 3), "scores": {k: round(v, 3) for k, v in urgency_scores.items()}, "reason_codes": urgency_codes} + evidence.extend(urgency_ev) + + workflow_value, workflow_conf, workflow_scores, workflow_ev = self._workflow_label(similarities, text) + labels["workflow_category"] = {"value": workflow_value, "confidence": round(workflow_conf, 3), "scores": {k: round(v, 3) for k, v in workflow_scores.items()}} + evidence.extend(workflow_ev) + + safety_rule, safety_codes, safety_evidence = best_rule(text, "safety") + safety_proto = 0.0 + safety_conf = round(max(safety_rule, safety_proto), 3) + labels["safety_confirmation_required"] = {"value": safety_conf >= 0.80, "confidence": safety_conf, "threshold": 0.80, "reason_codes": safety_codes} + evidence.extend(safety_evidence) + + npu_delta = embedded.npu_busy_delta_us + sysfs_delta = None if sysfs_before is None or sysfs_after is None else sysfs_after - sysfs_before + warnings = list(self.warnings) + if not npu_delta or npu_delta <= 0: + warnings.append("embedding call did not report positive npu_busy_delta_us; NPU execution not proven for this request") + if sysfs_delta is not None and sysfs_delta <= 0: + warnings.append("sysfs npu_busy_time_us did not increase during classification request") + + response: dict[str, Any] = { + "id": item_id, + "model": MODEL, + "created": int(time.time()), + "duration_ms": round((time.perf_counter() - started) * 1000, 3), + "npu_busy_delta_us": npu_delta, + "sysfs_npu_busy_delta_us": sysfs_delta, + "dry_run": dry_run, + "labels": labels, + "warnings": warnings, + } + if include_evidence: + response["evidence"] = evidence[:30] + if include_embedding_debug: + response["embedding_debug"] = {"embedding_dim": len(message_vec), "prototype_scores": {k: round(v, 3) for k, v in similarities.items()}} + return response + + def batch_classify(self, items: list[dict[str, Any]], options: dict[str, Any] | None = None) -> dict[str, Any]: + started = time.perf_counter() + results = [self.classify(item.get("id"), str(item.get("text") or ""), options) for item in items] + return { + "model": MODEL, + "duration_ms": round((time.perf_counter() - started) * 1000, 3), + "npu_busy_delta_us": sum((r.get("npu_busy_delta_us") or 0) for r in results), + "results": results, + } + + def _prototype_scores(self, vec: list[float]) -> dict[str, float]: + assert self.prototype_vectors is not None + scores: dict[str, float] = {} + for key, prototype_vec in zip(self.prototype_keys, self.prototype_vectors): + scores[key] = max(scores.get(key, 0.0), cosine(vec, prototype_vec)) + return scores + + def _memory_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, list[str], list[dict[str, Any]]]: + rule_score, codes, evidence = best_rule(text, "memory") + candidates = { + "user_preference": scores.get("memory_user_preference", 0.0), + "durable_user_fact": scores.get("memory_durable_user_fact", 0.0), + "environment_fact": scores.get("memory_environment_fact", 0.0), + "workflow_convention": scores.get("memory_workflow_convention", 0.0), + "skill_candidate": scores.get("memory_skill_candidate", 0.0), + } + label, proto_score = max(candidates.items(), key=lambda kv: kv[1]) + confidence = max(proto_score, rule_score) + explicit_memory = rule_score >= 0.78 + durable_fact_hint = bool(re.search(r"\b(project uses|repo uses|environment uses|runs on|standard practice|convention|workflow convention)\b", text, re.I)) + if explicit_memory: + if re.search(r"\b(prefer|preference|call me|my name|I live|I am)\b", text, re.I): + label = "user_preference" if re.search(r"\b(prefer|preference)\b", text, re.I) else "durable_user_fact" + elif durable_fact_hint: + label = "environment_fact" + elif re.search(r"\b(skill|procedure|workflow)\b", text, re.I): + label = "skill_candidate" + # BGE prototype similarities are advisory but broad; avoid recommending + # memory writes from similarity alone unless the text also has durable- + # fact language or an unusually strong prototype match. + if confidence < 0.78 or (not explicit_memory and not durable_fact_hint and proto_score < 0.88): + label = "none" + else: + evidence.append({"label": "memory_candidate", "source": "prototype_similarity", "prototype": f"memory_{label}", "score": round(proto_score, 3)}) + return label, confidence if label != "none" else max(0.0, min(confidence, 0.77)), codes, evidence + + def _urgency_label(self, text: str, scores: dict[str, float]) -> tuple[str, float, dict[str, float], list[str], list[dict[str, Any]]]: + high_rule, high_codes, high_ev = best_rule(text, "urgency_high") + critical_rule, critical_codes, critical_ev = best_rule(text, "urgency_critical") + low_rule = 0.82 if re.search(r"\b(no rush|whenever convenient|low priority|someday|backlog)\b", text, re.I) else 0.0 + # Urgency is safety-sensitive for notifications. Prefer explicit rules; + # use prototype scores only when they are unusually strong. + score_map = { + "low": max(low_rule, scores.get("urgency_low", 0.0) if scores.get("urgency_low", 0.0) >= 0.9 else 0.0), + "normal": 0.68, + "high": max(high_rule, scores.get("urgency_high", 0.0) if scores.get("urgency_high", 0.0) >= 0.9 else 0.0), + "critical": max(critical_rule, scores.get("urgency_critical", 0.0) if scores.get("urgency_critical", 0.0) >= 0.92 else 0.0), + } + if score_map["critical"] >= 0.9: + score_map["normal"] = 0.05 + elif score_map["high"] >= 0.8 or score_map["low"] >= 0.8: + score_map["normal"] = 0.2 + value, confidence = max(score_map.items(), key=lambda kv: kv[1]) + evidence = high_ev + critical_ev + return value, confidence, score_map, sorted(set(high_codes + critical_codes)), evidence + + def _workflow_label(self, scores: dict[str, float], text: str = "") -> tuple[str, float, dict[str, float], list[dict[str, Any]]]: + score_map = {category: scores.get(f"workflow_{category}", 0.0) for category in WORKFLOW_CATEGORIES if category != "unknown"} + rule_patterns: list[tuple[str, str]] = [ + ("chat", r"\bwhat time is it|what date is it|general question\b"), + ("kanban", r"\bkanban|task card|review-required|blocked\b"), + ("smart_home", r"\blights?|thermostat|home assistant|hue|wiz\b"), + ("media", r"\btranscribe|voice memo|audio|video|image|spotify|youtube\b"), + ("research", r"\bresearch|compare sources|papers?|literature|web search\b"), + ("devops", r"\bsystemd|docker|kubernetes|service|ports?|gateway|deploy|infrastructure\b"), + ("debugging", r"\bdebug|failing|traceback|logs?|reproduce|diagnose\b"), + ("coding", r"\bimplement|code|pytest|refactor|feature|PR\b"), + ("note_taking", r"\bobsidian|notes?|memory|diary|chroma|reindex\b"), + ("productivity", r"\bcalendar|email|spreadsheet|presentation|notion|airtable|linear\b"), + ] + rule_value: str | None = None + for category, pattern in rule_patterns: + if re.search(pattern, text, re.I): + rule_value = category + break + if rule_value: + value = rule_value + confidence = max(0.86, score_map.get(rule_value, 0.0)) + score_map[rule_value] = confidence + source = "rule" + else: + value, confidence = max(score_map.items(), key=lambda kv: kv[1]) + source = "prototype_similarity" + if confidence < 0.52: + value = "unknown" + confidence = 0.52 + score_map["unknown"] = 1.0 - confidence if value != "unknown" else confidence + evidence = [{"label": "workflow_category", "source": source, "prototype": f"workflow_{value}", "score": round(confidence, 3)}] + return value, confidence, score_map, evidence + + +class Handler(BaseHTTPRequestHandler): + server_version = "AtlasRouterClassifier/0.1" + + @property + def svc(self) -> ClassifierService: + return self.server.classifier_service # type: ignore[attr-defined] + + def do_GET(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + if path in {"/", "/healthz", "/readyz", "/health"}: + self.write_json(self.svc.health()) + elif path == "/v1/labels": + self.write_json(self.svc.labels()) + else: + self.write_json({"error": "not found"}, status=404) + + def do_POST(self) -> None: + path = self.path.split("?", 1)[0].rstrip("/") or "/" + try: + payload = self.read_json() + options = payload.get("options") if isinstance(payload.get("options"), dict) else {} + if path == "/v1/classify": + self.write_json(self.svc.classify(payload.get("id"), str(payload.get("text") or ""), options)) + elif path == "/v1/batch_classify": + items = payload.get("items") + if not isinstance(items, list): + raise ValueError("items must be a list") + self.write_json(self.svc.batch_classify(items, options)) + else: + self.write_json({"error": "not found"}, status=404) + except ValueError as exc: + self.write_json({"error": str(exc)}, status=400) + except Exception as exc: + self.write_json({"error": f"{type(exc).__name__}: {exc}"}, status=500) + + def read_json(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length") or 0) + body = self.rfile.read(length).decode("utf-8", "replace") if length else "{}" + payload = json.loads(body or "{}") + if not isinstance(payload, dict): + raise ValueError("JSON body must be an object") + return payload + + def write_json(self, payload: dict[str, Any], status: int = 200) -> None: + body = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A002 - stdlib override name + print(f"{self.address_string()} - {format % args}", file=sys.stderr, flush=True) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Dry-run Atlas/Hermes router classifier") + parser.add_argument("--host", default=os.environ.get("OPENVINO_CLASSIFIER_HOST", DEFAULT_HOST)) + parser.add_argument("--port", type=int, default=int(os.environ.get("OPENVINO_CLASSIFIER_PORT", DEFAULT_PORT))) + parser.add_argument("--embed-url", default=os.environ.get("OPENVINO_CLASSIFIER_EMBED_URL", DEFAULT_EMBED_URL)) + parser.add_argument("--timeout-s", type=float, default=float(os.environ.get("OPENVINO_CLASSIFIER_TIMEOUT_S", "30"))) + parser.add_argument("--no-warmup", action="store_true", help="skip prototype embedding warmup until first request") + args = parser.parse_args() + + service = ClassifierService(args.embed_url, timeout_s=args.timeout_s) + if not args.no_warmup: + service.warmup() + httpd = ThreadingHTTPServer((args.host, args.port), Handler) + httpd.classifier_service = service # type: ignore[attr-defined] + print(f"{SERVICE} listening on {args.host}:{args.port} embed_url={args.embed_url} mode=dry_run", flush=True) + try: + httpd.serve_forever() + except KeyboardInterrupt: + pass + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/openvino-classifier-npu/tests/test_router_classifier.py b/openvino-classifier-npu/tests/test_router_classifier.py new file mode 100644 index 0000000..c044c0c --- /dev/null +++ b/openvino-classifier-npu/tests/test_router_classifier.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import importlib.util +import json +import sys +import unittest +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +MODULE_PATH = ROOT / "router_classifier.py" +spec = importlib.util.spec_from_file_location("router_classifier", MODULE_PATH) +assert spec and spec.loader +router_classifier = importlib.util.module_from_spec(spec) +sys.modules["router_classifier"] = router_classifier +spec.loader.exec_module(router_classifier) + + +class FakeClient: + def embed(self, texts, *, purpose="query"): + # Deterministic toy embeddings based on keyword buckets. The tests focus on + # rule safety and API shape; live smoke tests cover the real NPU upstream. + vectors = [] + for text in texts: + t = text.lower() + vec = [0.0] * 8 + if any(w in t for w in ["time", "current", "weather", "news", "port", "git", "logs", "systemd"]): + vec[0] = 1.0 + if any(w in t for w in ["remember", "prefer", "preference"]): + vec[1] = 1.0 + if any(w in t for w in ["urgent", "down", "outage", "critical"]): + vec[2] = 1.0 + if any(w in t for w in ["code", "pytest", "debug", "git", "diff"]): + vec[3] = 1.0 + if any(w in t for w in ["service", "systemd", "port", "gateway", "docker"]): + vec[4] = 1.0 + if any(w in t for w in ["kanban", "task", "blocked", "review"]): + vec[5] = 1.0 + if any(w in t for w in ["light", "thermostat"]): + vec[6] = 1.0 + if any(w in t for w in ["transcribe", "voice", "memo", "audio"]): + vec[7] = 1.0 + if not any(vec): + vec[0] = 0.2 + vectors.append(vec) + return router_classifier.EmbedResult(vectors=vectors, npu_busy_delta_us=123, duration_ms=1.0, embedding_dim=8) + + +class RouterClassifierTests(unittest.TestCase): + def service(self): + svc = router_classifier.ClassifierService("http://fake.local/v1/embeddings") + svc.client = FakeClient() + svc.warmup() + return svc + + def test_health_and_label_schema(self): + svc = self.service() + health = svc.health() + self.assertEqual(health["service"], "atlas-router-classifier") + self.assertEqual(health["mode"], "dry_run") + self.assertIn("tool_needed", health["labels"]) + labels = svc.labels() + self.assertIn("workflow_category", labels["enums"]) + self.assertIn("safety_confirmation_required", labels["thresholds"]) + + def test_explicit_preference_is_memory_candidate(self): + result = self.service().classify("pref", "Remember that I prefer concise terminal replies.") + self.assertEqual(result["labels"]["memory_candidate"]["value"], "user_preference") + self.assertGreaterEqual(result["labels"]["memory_candidate"]["confidence"], 0.78) + self.assertFalse(result["labels"]["safety_confirmation_required"]["value"]) + + def test_current_local_state_needs_tool(self): + result = self.service().classify("port", "Check whether port 18819 is listening and inspect systemd logs.") + self.assertTrue(result["labels"]["tool_needed"]["value"]) + self.assertIn("local_state_requested", result["labels"]["tool_needed"]["reason_codes"]) + + def test_live_gateway_restart_requires_confirmation(self): + result = self.service().classify("safe", "Restart the live Atlas gateway and switch primary routing.") + self.assertTrue(result["labels"]["safety_confirmation_required"]["value"]) + self.assertIn("live_service_or_routing_change", result["labels"]["safety_confirmation_required"]["reason_codes"]) + + def test_batch_shape(self): + result = self.service().batch_classify([ + {"id": "a", "text": "What time is it?"}, + {"id": "b", "text": "Delete the existing collection and reindex it in place."}, + ]) + self.assertEqual(result["model"], router_classifier.MODEL) + self.assertEqual(len(result["results"]), 2) + self.assertGreater(result["npu_busy_delta_us"], 0) + + def test_fixture_file_is_valid_jsonl(self): + fixture = ROOT / "fixtures" / "atlas_hermes_messages.jsonl" + rows = [json.loads(line) for line in fixture.read_text().splitlines() if line.strip()] + self.assertGreaterEqual(len(rows), 8) + for row in rows: + self.assertIn("id", row) + self.assertIn("text", row) + self.assertIn("expected", row) + + +if __name__ == "__main__": + unittest.main()