Files
swarm-master/openvino-genai-npu-worker/worker.py
2026-06-04 13:07:51 -07:00

290 lines
11 KiB
Python

#!/usr/bin/env python3
"""Local-only OpenVINO GenAI NPU worker.
Small bounded LLM worker for cheap background tasks. It intentionally does not
wire into Atlas/Hermes routing and does not log raw prompts by default.
"""
from __future__ import annotations
import argparse
import json
import os
import re
import socket
import threading
import time
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any, cast
from urllib.parse import urlparse
MODEL_ID = "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov"
DEFAULT_MODEL_PATH = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov"
DEFAULT_CACHE_DIR = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4"
BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
HOST = "127.0.0.1"
PORT = 18820
MAX_INPUT_CHARS = 6000
MAX_NEW_TOKENS = 256
GENAI_CONFIG = {
"CACHE_DIR": DEFAULT_CACHE_DIR,
"MAX_PROMPT_LEN": 1024,
"MIN_RESPONSE_LEN": 64,
"PREFILL_HINT": "DYNAMIC",
"GENERATE_HINT": "FAST_COMPILE",
}
DEFAULTS = {
"title": 32,
"summary": 160,
"memory_candidate": 192,
"notification": 96,
}
PROMPTS = {
"title": "Write one concise title, 8 words or fewer. Return only the title.\n\nInput:\n{input}",
"summary": "Summarize the input in one short paragraph or up to 4 bullets. Be factual and concise.\n\nInput:\n{input}",
"memory_candidate": (
"Extract durable memory candidates from the conversation excerpt. "
"Return strict JSON with keys: candidates (array of objects with fact, confidence, reason), notes. "
"Do not write memory; only propose candidates.\n\nInput:\n{input}"
),
"notification": (
"Condense this notification or log excerpt for a human. "
"Return JSON with keys: severity (info|warning|error), category, summary, action_needed.\n\nInput:\n{input}"
),
}
def import_openvino_genai() -> Any:
"""Import OpenVINO GenAI lazily so unit tests do not require the NPU venv."""
import openvino_genai as ov_genai # type: ignore[import-not-found]
return ov_genai
def listener_exists(host: str, port: int) -> bool:
"""Return True when a TCP listener already accepts connections."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(0.2)
return sock.connect_ex((host, port)) == 0
def coerce_json(text: str) -> Any | None:
text = text.strip()
if not text:
return None
try:
return json.loads(text)
except json.JSONDecodeError:
match = re.search(r"(\{.*\}|\[.*\])", text, re.S)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
return None
return None
@dataclass
class GenerationResult:
text: str
parsed_json: Any | None
timing_ms: dict[str, float]
npu_busy_delta_us: int
npu_busy_before_us: int
npu_busy_after_us: int
class NpuWorker:
def __init__(
self,
model_path: str,
cache_dir: str,
*,
busy_path: Path = BUSY_PATH,
bind_host: str = HOST,
bind_port: int = PORT,
):
self.model_path = Path(model_path)
self.cache_dir = Path(cache_dir)
self.busy_path = Path(busy_path)
self.bind_host = bind_host
self.bind_port = bind_port
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._pipe = None
self._load_ms: float | None = None
self._lock = threading.Lock()
self._loaded_at: float | None = None
if not self.model_path.exists():
raise FileNotFoundError(f"model path does not exist: {self.model_path}")
if not self.busy_path.exists():
raise FileNotFoundError(f"NPU busy-time counter does not exist: {self.busy_path}")
def read_busy(self) -> int:
return int(self.busy_path.read_text().strip())
def load(self) -> None:
if self._pipe is not None:
return
start = time.monotonic()
# NPU GenAI requires bounded prompt/response shapes; CACHE_DIR enables compiled blob caching.
ov_genai = import_openvino_genai()
config = GENAI_CONFIG | {"CACHE_DIR": str(self.cache_dir)}
self._pipe = ov_genai.LLMPipeline(str(self.model_path), "NPU", **config)
self._load_ms = round((time.monotonic() - start) * 1000, 2)
self._loaded_at = time.time()
def generate(self, job: str, user_input: str, max_new_tokens: int | None = None) -> GenerationResult:
if job not in PROMPTS:
raise ValueError(f"unsupported job: {job}")
if not isinstance(user_input, str) or not user_input.strip():
raise ValueError("input must be a non-empty string")
if len(user_input) > MAX_INPUT_CHARS:
raise ValueError(f"input too long: {len(user_input)} chars > {MAX_INPUT_CHARS}")
max_new_tokens = int(max_new_tokens or DEFAULTS[job])
if max_new_tokens < 1 or max_new_tokens > MAX_NEW_TOKENS:
raise ValueError(f"max_new_tokens must be between 1 and {MAX_NEW_TOKENS}")
prompt = PROMPTS[job].format(input=user_input.strip())
with self._lock:
load_start = time.monotonic()
self.load()
load_ms = round((time.monotonic() - load_start) * 1000, 2)
before = self.read_busy()
gen_start = time.monotonic()
pipe = cast(Any, self._pipe)
text = str(pipe.generate(prompt, max_new_tokens=max_new_tokens)).strip()
generate_ms = round((time.monotonic() - gen_start) * 1000, 2)
after = self.read_busy()
parsed = coerce_json(text) if job in {"memory_candidate", "notification"} else None
if job == "memory_candidate" and isinstance(parsed, list):
parsed = {"candidates": parsed, "notes": "model returned a top-level array; worker wrapped it to preserve the API contract"}
return GenerationResult(
text=text,
parsed_json=parsed,
timing_ms={"load": load_ms, "initial_load": self._load_ms or 0.0, "generate": generate_ms, "total": round(load_ms + generate_ms, 2)},
npu_busy_delta_us=after - before,
npu_busy_before_us=before,
npu_busy_after_us=after,
)
def health(self) -> dict[str, Any]:
return {
"ok": True,
"model": MODEL_ID,
"model_path": str(self.model_path),
"device": "NPU",
"cache_dir": str(self.cache_dir),
"cache_exists": self.cache_dir.exists(),
"loaded": self._pipe is not None,
"initial_load_ms": self._load_ms,
"loaded_at": self._loaded_at,
"busy_time_us": self.read_busy(),
"max_input_chars": MAX_INPUT_CHARS,
"max_new_tokens": MAX_NEW_TOKENS,
"jobs": sorted(PROMPTS),
"bind": f"{self.bind_host}:{self.bind_port}",
}
def response_payload(worker: NpuWorker, job: str, result: GenerationResult) -> dict[str, Any]:
return {
"model": MODEL_ID,
"device": "NPU",
"job": job,
"text": result.text,
"json": result.parsed_json,
"timing_ms": result.timing_ms,
"npu_busy_delta_us": result.npu_busy_delta_us,
"npu_busy_before_us": result.npu_busy_before_us,
"npu_busy_after_us": result.npu_busy_after_us,
"cache_dir": str(worker.cache_dir),
}
def make_handler(worker: NpuWorker):
class Handler(BaseHTTPRequestHandler):
server_version = "openvino-genai-npu-worker/0.2"
def log_message(self, format: str, *args: Any) -> None:
# Log only method/path/status metadata, not raw request bodies.
print(f"{self.client_address[0]} {format % args}")
def send_json(self, status: int, payload: Any) -> None:
body = json.dumps(payload, indent=2).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None: # noqa: N802
path = urlparse(self.path).path
if path == "/healthz":
self.send_json(200, worker.health())
elif path == "/models":
self.send_json(200, {"models": [{"id": MODEL_ID, "path": str(worker.model_path), "device": "NPU"}]})
else:
self.send_json(404, {"error": "not found"})
def do_POST(self) -> None: # noqa: N802
path = urlparse(self.path).path
route_job = {
"/v1/worker/generate": None,
"/v1/worker/extract-memory-candidates": "memory_candidate",
"/v1/worker/condense-notification": "notification",
}.get(path, "__missing__")
if route_job == "__missing__":
self.send_json(404, {"error": "not found"})
return
try:
length = int(self.headers.get("Content-Length", "0"))
payload = json.loads(self.rfile.read(length) or b"{}")
job = route_job or str(payload.get("job", "summary"))
if job == "memory":
job = "memory_candidate"
result = worker.generate(job, str(payload.get("input", "")), payload.get("max_new_tokens"))
body = response_payload(worker, job, result)
if result.npu_busy_delta_us <= 0:
body["error"] = "NPU busy-time counter did not increase during generation"
self.send_json(503, body)
return
self.send_json(200, body)
except Exception as exc:
self.send_json(400, {"error": str(exc)})
return Handler
def cli(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="OpenVINO GenAI NPU worker")
parser.add_argument("--model-path", default=os.environ.get("OV_GENAI_NPU_MODEL", DEFAULT_MODEL_PATH))
parser.add_argument("--cache-dir", default=os.environ.get("OV_GENAI_NPU_CACHE", DEFAULT_CACHE_DIR))
parser.add_argument("--host", default=os.environ.get("OV_GENAI_NPU_HOST", HOST))
parser.add_argument("--port", type=int, default=int(os.environ.get("OV_GENAI_NPU_PORT", PORT)))
parser.add_argument("--job", choices=sorted(PROMPTS), help="Run one CLI job instead of serving HTTP")
parser.add_argument("--input", help="Input text for --job")
parser.add_argument("--max-new-tokens", type=int)
args = parser.parse_args(argv)
if args.host != "127.0.0.1":
raise SystemExit("Refusing non-local bind without code change/explicit approval")
worker = NpuWorker(args.model_path, args.cache_dir, bind_host=args.host, bind_port=args.port)
if args.job:
result = worker.generate(args.job, args.input or "", args.max_new_tokens)
print(json.dumps(response_payload(worker, args.job, result), indent=2))
return 0 if result.npu_busy_delta_us > 0 else 2
if listener_exists(args.host, args.port):
raise SystemExit(f"Refusing to start: listener already exists on {args.host}:{args.port}")
server = ThreadingHTTPServer((args.host, args.port), make_handler(worker))
print(f"serving {MODEL_ID} on http://{args.host}:{args.port}; raw prompts are not logged")
server.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(cli())