feat: refresh OpenVINO GenAI NPU worker prototype

This commit is contained in:
William Valentin
2026-06-04 12:13:44 -07:00
parent cb874f9743
commit b538a5a1f9
7 changed files with 253 additions and 36 deletions
+64 -26
View File
@@ -10,6 +10,7 @@ import argparse
import json
import os
import re
import socket
import threading
import time
from dataclasses import dataclass
@@ -18,8 +19,6 @@ from pathlib import Path
from typing import Any, cast
from urllib.parse import urlparse
import openvino_genai as ov_genai # type: ignore[import-not-found]
MODEL_ID = "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov"
DEFAULT_MODEL_PATH = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov"
DEFAULT_CACHE_DIR = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4"
@@ -27,6 +26,14 @@ BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
HOST = "127.0.0.1"
PORT = 18820
MAX_INPUT_CHARS = 6000
MAX_NEW_TOKENS = 256
GENAI_CONFIG = {
"CACHE_DIR": DEFAULT_CACHE_DIR,
"MAX_PROMPT_LEN": 1024,
"MIN_RESPONSE_LEN": 64,
"PREFILL_HINT": "DYNAMIC",
"GENERATE_HINT": "FAST_COMPILE",
}
DEFAULTS = {
"title": 32,
"summary": 160,
@@ -48,8 +55,20 @@ PROMPTS = {
}
def read_busy() -> int:
return int(BUSY_PATH.read_text().strip())
def import_openvino_genai() -> Any:
"""Import OpenVINO GenAI lazily so unit tests do not require the NPU venv."""
import openvino_genai as ov_genai # type: ignore[import-not-found]
return ov_genai
def listener_exists(host: str, port: int) -> bool:
"""Return True when a TCP listener already accepts connections."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(0.2)
return sock.connect_ex((host, port)) == 0
def coerce_json(text: str) -> Any | None:
@@ -79,9 +98,20 @@ class GenerationResult:
class NpuWorker:
def __init__(self, model_path: str, cache_dir: str):
def __init__(
self,
model_path: str,
cache_dir: str,
*,
busy_path: Path = BUSY_PATH,
bind_host: str = HOST,
bind_port: int = PORT,
):
self.model_path = Path(model_path)
self.cache_dir = Path(cache_dir)
self.busy_path = Path(busy_path)
self.bind_host = bind_host
self.bind_port = bind_port
self.cache_dir.mkdir(parents=True, exist_ok=True)
self._pipe = None
self._load_ms: float | None = None
@@ -89,21 +119,20 @@ class NpuWorker:
self._loaded_at: float | None = None
if not self.model_path.exists():
raise FileNotFoundError(f"model path does not exist: {self.model_path}")
if not self.busy_path.exists():
raise FileNotFoundError(f"NPU busy-time counter does not exist: {self.busy_path}")
def read_busy(self) -> int:
return int(self.busy_path.read_text().strip())
def load(self) -> None:
if self._pipe is not None:
return
start = time.monotonic()
# NPU GenAI requires bounded prompt/response shapes; CACHE_DIR enables compiled blob caching.
self._pipe = ov_genai.LLMPipeline(
str(self.model_path),
"NPU",
CACHE_DIR=str(self.cache_dir),
MAX_PROMPT_LEN=1024,
MIN_RESPONSE_LEN=64,
PREFILL_HINT="DYNAMIC",
GENERATE_HINT="FAST_COMPILE",
)
ov_genai = import_openvino_genai()
config = GENAI_CONFIG | {"CACHE_DIR": str(self.cache_dir)}
self._pipe = ov_genai.LLMPipeline(str(self.model_path), "NPU", **config)
self._load_ms = round((time.monotonic() - start) * 1000, 2)
self._loaded_at = time.time()
@@ -115,19 +144,19 @@ class NpuWorker:
if len(user_input) > MAX_INPUT_CHARS:
raise ValueError(f"input too long: {len(user_input)} chars > {MAX_INPUT_CHARS}")
max_new_tokens = int(max_new_tokens or DEFAULTS[job])
if max_new_tokens < 1 or max_new_tokens > 256:
raise ValueError("max_new_tokens must be between 1 and 256")
if max_new_tokens < 1 or max_new_tokens > MAX_NEW_TOKENS:
raise ValueError(f"max_new_tokens must be between 1 and {MAX_NEW_TOKENS}")
prompt = PROMPTS[job].format(input=user_input.strip())
with self._lock:
load_start = time.monotonic()
self.load()
load_ms = round((time.monotonic() - load_start) * 1000, 2)
before = read_busy()
before = self.read_busy()
gen_start = time.monotonic()
pipe = cast(Any, self._pipe)
text = str(pipe.generate(prompt, max_new_tokens=max_new_tokens)).strip()
generate_ms = round((time.monotonic() - gen_start) * 1000, 2)
after = read_busy()
after = self.read_busy()
parsed = coerce_json(text) if job in {"memory_candidate", "notification"} else None
if job == "memory_candidate" and isinstance(parsed, list):
parsed = {"candidates": parsed, "notes": "model returned a top-level array; worker wrapped it to preserve the API contract"}
@@ -151,10 +180,11 @@ class NpuWorker:
"loaded": self._pipe is not None,
"initial_load_ms": self._load_ms,
"loaded_at": self._loaded_at,
"busy_time_us": read_busy(),
"busy_time_us": self.read_busy(),
"max_input_chars": MAX_INPUT_CHARS,
"max_new_tokens": MAX_NEW_TOKENS,
"jobs": sorted(PROMPTS),
"bind": f"{HOST}:{PORT}",
"bind": f"{self.bind_host}:{self.bind_port}",
}
@@ -175,7 +205,7 @@ def response_payload(worker: NpuWorker, job: str, result: GenerationResult) -> d
def make_handler(worker: NpuWorker):
class Handler(BaseHTTPRequestHandler):
server_version = "openvino-genai-npu-worker/0.1"
server_version = "openvino-genai-npu-worker/0.2"
def log_message(self, format: str, *args: Any) -> None:
# Log only method/path/status metadata, not raw request bodies.
@@ -215,7 +245,12 @@ def make_handler(worker: NpuWorker):
if job == "memory":
job = "memory_candidate"
result = worker.generate(job, str(payload.get("input", "")), payload.get("max_new_tokens"))
self.send_json(200, response_payload(worker, job, result))
body = response_payload(worker, job, result)
if result.npu_busy_delta_us <= 0:
body["error"] = "NPU busy-time counter did not increase during generation"
self.send_json(503, body)
return
self.send_json(200, body)
except Exception as exc:
self.send_json(400, {"error": str(exc)})
@@ -226,21 +261,24 @@ def cli(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="OpenVINO GenAI NPU worker")
parser.add_argument("--model-path", default=os.environ.get("OV_GENAI_NPU_MODEL", DEFAULT_MODEL_PATH))
parser.add_argument("--cache-dir", default=os.environ.get("OV_GENAI_NPU_CACHE", DEFAULT_CACHE_DIR))
parser.add_argument("--host", default=HOST)
parser.add_argument("--host", default=os.environ.get("OV_GENAI_NPU_HOST", HOST))
parser.add_argument("--port", type=int, default=int(os.environ.get("OV_GENAI_NPU_PORT", PORT)))
parser.add_argument("--job", choices=sorted(PROMPTS), help="Run one CLI job instead of serving HTTP")
parser.add_argument("--input", help="Input text for --job")
parser.add_argument("--max-new-tokens", type=int)
args = parser.parse_args(argv)
worker = NpuWorker(args.model_path, args.cache_dir)
if args.host != "127.0.0.1":
raise SystemExit("Refusing non-local bind without code change/explicit approval")
worker = NpuWorker(args.model_path, args.cache_dir, bind_host=args.host, bind_port=args.port)
if args.job:
result = worker.generate(args.job, args.input or "", args.max_new_tokens)
print(json.dumps(response_payload(worker, args.job, result), indent=2))
return 0 if result.npu_busy_delta_us > 0 else 2
if args.host != "127.0.0.1":
raise SystemExit("Refusing non-local bind without code change/explicit approval")
if listener_exists(args.host, args.port):
raise SystemExit(f"Refusing to start: listener already exists on {args.host}:{args.port}")
server = ThreadingHTTPServer((args.host, args.port), make_handler(worker))
print(f"serving {MODEL_ID} on http://{args.host}:{args.port}; raw prompts are not logged")
server.serve_forever()