#!/usr/bin/env python3 """Local-only OpenVINO GenAI NPU worker. Small bounded LLM worker for cheap background tasks. It intentionally does not wire into Atlas/Hermes routing and does not log raw prompts by default. """ from __future__ import annotations import argparse import json import os import re import threading import time from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any, cast from urllib.parse import urlparse import openvino_genai as ov_genai # type: ignore[import-not-found] MODEL_ID = "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_MODEL_PATH = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_CACHE_DIR = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4" BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") HOST = "127.0.0.1" PORT = 18820 MAX_INPUT_CHARS = 6000 DEFAULTS = { "title": 32, "summary": 160, "memory_candidate": 192, "notification": 96, } PROMPTS = { "title": "Write one concise title, 8 words or fewer. Return only the title.\n\nInput:\n{input}", "summary": "Summarize the input in one short paragraph or up to 4 bullets. Be factual and concise.\n\nInput:\n{input}", "memory_candidate": ( "Extract durable memory candidates from the conversation excerpt. " "Return strict JSON with keys: candidates (array of objects with fact, confidence, reason), notes. " "Do not write memory; only propose candidates.\n\nInput:\n{input}" ), "notification": ( "Condense this notification or log excerpt for a human. " "Return JSON with keys: severity (info|warning|error), category, summary, action_needed.\n\nInput:\n{input}" ), } def read_busy() -> int: return int(BUSY_PATH.read_text().strip()) def coerce_json(text: str) -> Any | None: text = text.strip() if not text: return None try: return json.loads(text) except json.JSONDecodeError: match = re.search(r"(\{.*\}|\[.*\])", text, re.S) if match: try: return json.loads(match.group(1)) except json.JSONDecodeError: return None return None @dataclass class GenerationResult: text: str parsed_json: Any | None timing_ms: dict[str, float] npu_busy_delta_us: int npu_busy_before_us: int npu_busy_after_us: int class NpuWorker: def __init__(self, model_path: str, cache_dir: str): self.model_path = Path(model_path) self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True) self._pipe = None self._load_ms: float | None = None self._lock = threading.Lock() self._loaded_at: float | None = None if not self.model_path.exists(): raise FileNotFoundError(f"model path does not exist: {self.model_path}") def load(self) -> None: if self._pipe is not None: return start = time.monotonic() # NPU GenAI requires bounded prompt/response shapes; CACHE_DIR enables compiled blob caching. self._pipe = ov_genai.LLMPipeline( str(self.model_path), "NPU", CACHE_DIR=str(self.cache_dir), MAX_PROMPT_LEN=1024, MIN_RESPONSE_LEN=64, PREFILL_HINT="DYNAMIC", GENERATE_HINT="FAST_COMPILE", ) self._load_ms = round((time.monotonic() - start) * 1000, 2) self._loaded_at = time.time() def generate(self, job: str, user_input: str, max_new_tokens: int | None = None) -> GenerationResult: if job not in PROMPTS: raise ValueError(f"unsupported job: {job}") if not isinstance(user_input, str) or not user_input.strip(): raise ValueError("input must be a non-empty string") if len(user_input) > MAX_INPUT_CHARS: raise ValueError(f"input too long: {len(user_input)} chars > {MAX_INPUT_CHARS}") max_new_tokens = int(max_new_tokens or DEFAULTS[job]) if max_new_tokens < 1 or max_new_tokens > 256: raise ValueError("max_new_tokens must be between 1 and 256") prompt = PROMPTS[job].format(input=user_input.strip()) with self._lock: load_start = time.monotonic() self.load() load_ms = round((time.monotonic() - load_start) * 1000, 2) before = read_busy() gen_start = time.monotonic() pipe = cast(Any, self._pipe) text = str(pipe.generate(prompt, max_new_tokens=max_new_tokens)).strip() generate_ms = round((time.monotonic() - gen_start) * 1000, 2) after = read_busy() parsed = coerce_json(text) if job in {"memory_candidate", "notification"} else None if job == "memory_candidate" and isinstance(parsed, list): parsed = {"candidates": parsed, "notes": "model returned a top-level array; worker wrapped it to preserve the API contract"} return GenerationResult( text=text, parsed_json=parsed, timing_ms={"load": load_ms, "initial_load": self._load_ms or 0.0, "generate": generate_ms, "total": round(load_ms + generate_ms, 2)}, npu_busy_delta_us=after - before, npu_busy_before_us=before, npu_busy_after_us=after, ) def health(self) -> dict[str, Any]: return { "ok": True, "model": MODEL_ID, "model_path": str(self.model_path), "device": "NPU", "cache_dir": str(self.cache_dir), "cache_exists": self.cache_dir.exists(), "loaded": self._pipe is not None, "initial_load_ms": self._load_ms, "loaded_at": self._loaded_at, "busy_time_us": read_busy(), "max_input_chars": MAX_INPUT_CHARS, "jobs": sorted(PROMPTS), "bind": f"{HOST}:{PORT}", } def response_payload(worker: NpuWorker, job: str, result: GenerationResult) -> dict[str, Any]: return { "model": MODEL_ID, "device": "NPU", "job": job, "text": result.text, "json": result.parsed_json, "timing_ms": result.timing_ms, "npu_busy_delta_us": result.npu_busy_delta_us, "npu_busy_before_us": result.npu_busy_before_us, "npu_busy_after_us": result.npu_busy_after_us, "cache_dir": str(worker.cache_dir), } def make_handler(worker: NpuWorker): class Handler(BaseHTTPRequestHandler): server_version = "openvino-genai-npu-worker/0.1" def log_message(self, format: str, *args: Any) -> None: # Log only method/path/status metadata, not raw request bodies. print(f"{self.client_address[0]} {format % args}") def send_json(self, status: int, payload: Any) -> None: body = json.dumps(payload, indent=2).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self) -> None: # noqa: N802 path = urlparse(self.path).path if path == "/healthz": self.send_json(200, worker.health()) elif path == "/models": self.send_json(200, {"models": [{"id": MODEL_ID, "path": str(worker.model_path), "device": "NPU"}]}) else: self.send_json(404, {"error": "not found"}) def do_POST(self) -> None: # noqa: N802 path = urlparse(self.path).path route_job = { "/v1/worker/generate": None, "/v1/worker/extract-memory-candidates": "memory_candidate", "/v1/worker/condense-notification": "notification", }.get(path, "__missing__") if route_job == "__missing__": self.send_json(404, {"error": "not found"}) return try: length = int(self.headers.get("Content-Length", "0")) payload = json.loads(self.rfile.read(length) or b"{}") job = route_job or str(payload.get("job", "summary")) if job == "memory": job = "memory_candidate" result = worker.generate(job, str(payload.get("input", "")), payload.get("max_new_tokens")) self.send_json(200, response_payload(worker, job, result)) except Exception as exc: self.send_json(400, {"error": str(exc)}) return Handler def cli(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="OpenVINO GenAI NPU worker") parser.add_argument("--model-path", default=os.environ.get("OV_GENAI_NPU_MODEL", DEFAULT_MODEL_PATH)) parser.add_argument("--cache-dir", default=os.environ.get("OV_GENAI_NPU_CACHE", DEFAULT_CACHE_DIR)) parser.add_argument("--host", default=HOST) parser.add_argument("--port", type=int, default=int(os.environ.get("OV_GENAI_NPU_PORT", PORT))) parser.add_argument("--job", choices=sorted(PROMPTS), help="Run one CLI job instead of serving HTTP") parser.add_argument("--input", help="Input text for --job") parser.add_argument("--max-new-tokens", type=int) args = parser.parse_args(argv) worker = NpuWorker(args.model_path, args.cache_dir) if args.job: result = worker.generate(args.job, args.input or "", args.max_new_tokens) print(json.dumps(response_payload(worker, args.job, result), indent=2)) return 0 if result.npu_busy_delta_us > 0 else 2 if args.host != "127.0.0.1": raise SystemExit("Refusing non-local bind without code change/explicit approval") server = ThreadingHTTPServer((args.host, args.port), make_handler(worker)) print(f"serving {MODEL_ID} on http://{args.host}:{args.port}; raw prompts are not logged") server.serve_forever() return 0 if __name__ == "__main__": raise SystemExit(cli())