290 lines
11 KiB
Python
290 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""Local-only OpenVINO GenAI NPU worker.
|
|
|
|
Small bounded LLM worker for cheap background tasks. It intentionally does not
|
|
wire into Atlas/Hermes routing and does not log raw prompts by default.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import socket
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from typing import Any, cast
|
|
from urllib.parse import urlparse
|
|
|
|
MODEL_ID = "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov"
|
|
DEFAULT_MODEL_PATH = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov"
|
|
DEFAULT_CACHE_DIR = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4"
|
|
BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
|
|
HOST = "127.0.0.1"
|
|
PORT = 18820
|
|
MAX_INPUT_CHARS = 6000
|
|
MAX_NEW_TOKENS = 256
|
|
GENAI_CONFIG = {
|
|
"CACHE_DIR": DEFAULT_CACHE_DIR,
|
|
"MAX_PROMPT_LEN": 1024,
|
|
"MIN_RESPONSE_LEN": 64,
|
|
"PREFILL_HINT": "DYNAMIC",
|
|
"GENERATE_HINT": "FAST_COMPILE",
|
|
}
|
|
DEFAULTS = {
|
|
"title": 32,
|
|
"summary": 160,
|
|
"memory_candidate": 192,
|
|
"notification": 96,
|
|
}
|
|
PROMPTS = {
|
|
"title": "Write one concise title, 8 words or fewer. Return only the title.\n\nInput:\n{input}",
|
|
"summary": "Summarize the input in one short paragraph or up to 4 bullets. Be factual and concise.\n\nInput:\n{input}",
|
|
"memory_candidate": (
|
|
"Extract durable memory candidates from the conversation excerpt. "
|
|
"Return strict JSON with keys: candidates (array of objects with fact, confidence, reason), notes. "
|
|
"Do not write memory; only propose candidates.\n\nInput:\n{input}"
|
|
),
|
|
"notification": (
|
|
"Condense this notification or log excerpt for a human. "
|
|
"Return JSON with keys: severity (info|warning|error), category, summary, action_needed.\n\nInput:\n{input}"
|
|
),
|
|
}
|
|
|
|
|
|
def import_openvino_genai() -> Any:
|
|
"""Import OpenVINO GenAI lazily so unit tests do not require the NPU venv."""
|
|
|
|
import openvino_genai as ov_genai # type: ignore[import-not-found]
|
|
|
|
return ov_genai
|
|
|
|
|
|
def listener_exists(host: str, port: int) -> bool:
|
|
"""Return True when a TCP listener already accepts connections."""
|
|
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(0.2)
|
|
return sock.connect_ex((host, port)) == 0
|
|
|
|
|
|
def coerce_json(text: str) -> Any | None:
|
|
text = text.strip()
|
|
if not text:
|
|
return None
|
|
try:
|
|
return json.loads(text)
|
|
except json.JSONDecodeError:
|
|
match = re.search(r"(\{.*\}|\[.*\])", text, re.S)
|
|
if match:
|
|
try:
|
|
return json.loads(match.group(1))
|
|
except json.JSONDecodeError:
|
|
return None
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class GenerationResult:
|
|
text: str
|
|
parsed_json: Any | None
|
|
timing_ms: dict[str, float]
|
|
npu_busy_delta_us: int
|
|
npu_busy_before_us: int
|
|
npu_busy_after_us: int
|
|
|
|
|
|
class NpuWorker:
|
|
def __init__(
|
|
self,
|
|
model_path: str,
|
|
cache_dir: str,
|
|
*,
|
|
busy_path: Path = BUSY_PATH,
|
|
bind_host: str = HOST,
|
|
bind_port: int = PORT,
|
|
):
|
|
self.model_path = Path(model_path)
|
|
self.cache_dir = Path(cache_dir)
|
|
self.busy_path = Path(busy_path)
|
|
self.bind_host = bind_host
|
|
self.bind_port = bind_port
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
|
self._pipe = None
|
|
self._load_ms: float | None = None
|
|
self._lock = threading.Lock()
|
|
self._loaded_at: float | None = None
|
|
if not self.model_path.exists():
|
|
raise FileNotFoundError(f"model path does not exist: {self.model_path}")
|
|
if not self.busy_path.exists():
|
|
raise FileNotFoundError(f"NPU busy-time counter does not exist: {self.busy_path}")
|
|
|
|
def read_busy(self) -> int:
|
|
return int(self.busy_path.read_text().strip())
|
|
|
|
def load(self) -> None:
|
|
if self._pipe is not None:
|
|
return
|
|
start = time.monotonic()
|
|
# NPU GenAI requires bounded prompt/response shapes; CACHE_DIR enables compiled blob caching.
|
|
ov_genai = import_openvino_genai()
|
|
config = GENAI_CONFIG | {"CACHE_DIR": str(self.cache_dir)}
|
|
self._pipe = ov_genai.LLMPipeline(str(self.model_path), "NPU", **config)
|
|
self._load_ms = round((time.monotonic() - start) * 1000, 2)
|
|
self._loaded_at = time.time()
|
|
|
|
def generate(self, job: str, user_input: str, max_new_tokens: int | None = None) -> GenerationResult:
|
|
if job not in PROMPTS:
|
|
raise ValueError(f"unsupported job: {job}")
|
|
if not isinstance(user_input, str) or not user_input.strip():
|
|
raise ValueError("input must be a non-empty string")
|
|
if len(user_input) > MAX_INPUT_CHARS:
|
|
raise ValueError(f"input too long: {len(user_input)} chars > {MAX_INPUT_CHARS}")
|
|
max_new_tokens = int(max_new_tokens or DEFAULTS[job])
|
|
if max_new_tokens < 1 or max_new_tokens > MAX_NEW_TOKENS:
|
|
raise ValueError(f"max_new_tokens must be between 1 and {MAX_NEW_TOKENS}")
|
|
prompt = PROMPTS[job].format(input=user_input.strip())
|
|
with self._lock:
|
|
load_start = time.monotonic()
|
|
self.load()
|
|
load_ms = round((time.monotonic() - load_start) * 1000, 2)
|
|
before = self.read_busy()
|
|
gen_start = time.monotonic()
|
|
pipe = cast(Any, self._pipe)
|
|
text = str(pipe.generate(prompt, max_new_tokens=max_new_tokens)).strip()
|
|
generate_ms = round((time.monotonic() - gen_start) * 1000, 2)
|
|
after = self.read_busy()
|
|
parsed = coerce_json(text) if job in {"memory_candidate", "notification"} else None
|
|
if job == "memory_candidate" and isinstance(parsed, list):
|
|
parsed = {"candidates": parsed, "notes": "model returned a top-level array; worker wrapped it to preserve the API contract"}
|
|
return GenerationResult(
|
|
text=text,
|
|
parsed_json=parsed,
|
|
timing_ms={"load": load_ms, "initial_load": self._load_ms or 0.0, "generate": generate_ms, "total": round(load_ms + generate_ms, 2)},
|
|
npu_busy_delta_us=after - before,
|
|
npu_busy_before_us=before,
|
|
npu_busy_after_us=after,
|
|
)
|
|
|
|
def health(self) -> dict[str, Any]:
|
|
return {
|
|
"ok": True,
|
|
"model": MODEL_ID,
|
|
"model_path": str(self.model_path),
|
|
"device": "NPU",
|
|
"cache_dir": str(self.cache_dir),
|
|
"cache_exists": self.cache_dir.exists(),
|
|
"loaded": self._pipe is not None,
|
|
"initial_load_ms": self._load_ms,
|
|
"loaded_at": self._loaded_at,
|
|
"busy_time_us": self.read_busy(),
|
|
"max_input_chars": MAX_INPUT_CHARS,
|
|
"max_new_tokens": MAX_NEW_TOKENS,
|
|
"jobs": sorted(PROMPTS),
|
|
"bind": f"{self.bind_host}:{self.bind_port}",
|
|
}
|
|
|
|
|
|
def response_payload(worker: NpuWorker, job: str, result: GenerationResult) -> dict[str, Any]:
|
|
return {
|
|
"model": MODEL_ID,
|
|
"device": "NPU",
|
|
"job": job,
|
|
"text": result.text,
|
|
"json": result.parsed_json,
|
|
"timing_ms": result.timing_ms,
|
|
"npu_busy_delta_us": result.npu_busy_delta_us,
|
|
"npu_busy_before_us": result.npu_busy_before_us,
|
|
"npu_busy_after_us": result.npu_busy_after_us,
|
|
"cache_dir": str(worker.cache_dir),
|
|
}
|
|
|
|
|
|
def make_handler(worker: NpuWorker):
|
|
class Handler(BaseHTTPRequestHandler):
|
|
server_version = "openvino-genai-npu-worker/0.2"
|
|
|
|
def log_message(self, format: str, *args: Any) -> None:
|
|
# Log only method/path/status metadata, not raw request bodies.
|
|
print(f"{self.client_address[0]} {format % args}")
|
|
|
|
def send_json(self, status: int, payload: Any) -> None:
|
|
body = json.dumps(payload, indent=2).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self) -> None: # noqa: N802
|
|
path = urlparse(self.path).path
|
|
if path == "/healthz":
|
|
self.send_json(200, worker.health())
|
|
elif path == "/models":
|
|
self.send_json(200, {"models": [{"id": MODEL_ID, "path": str(worker.model_path), "device": "NPU"}]})
|
|
else:
|
|
self.send_json(404, {"error": "not found"})
|
|
|
|
def do_POST(self) -> None: # noqa: N802
|
|
path = urlparse(self.path).path
|
|
route_job = {
|
|
"/v1/worker/generate": None,
|
|
"/v1/worker/extract-memory-candidates": "memory_candidate",
|
|
"/v1/worker/condense-notification": "notification",
|
|
}.get(path, "__missing__")
|
|
if route_job == "__missing__":
|
|
self.send_json(404, {"error": "not found"})
|
|
return
|
|
try:
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
payload = json.loads(self.rfile.read(length) or b"{}")
|
|
job = route_job or str(payload.get("job", "summary"))
|
|
if job == "memory":
|
|
job = "memory_candidate"
|
|
result = worker.generate(job, str(payload.get("input", "")), payload.get("max_new_tokens"))
|
|
body = response_payload(worker, job, result)
|
|
if result.npu_busy_delta_us <= 0:
|
|
body["error"] = "NPU busy-time counter did not increase during generation"
|
|
self.send_json(503, body)
|
|
return
|
|
self.send_json(200, body)
|
|
except Exception as exc:
|
|
self.send_json(400, {"error": str(exc)})
|
|
|
|
return Handler
|
|
|
|
|
|
def cli(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description="OpenVINO GenAI NPU worker")
|
|
parser.add_argument("--model-path", default=os.environ.get("OV_GENAI_NPU_MODEL", DEFAULT_MODEL_PATH))
|
|
parser.add_argument("--cache-dir", default=os.environ.get("OV_GENAI_NPU_CACHE", DEFAULT_CACHE_DIR))
|
|
parser.add_argument("--host", default=os.environ.get("OV_GENAI_NPU_HOST", HOST))
|
|
parser.add_argument("--port", type=int, default=int(os.environ.get("OV_GENAI_NPU_PORT", PORT)))
|
|
parser.add_argument("--job", choices=sorted(PROMPTS), help="Run one CLI job instead of serving HTTP")
|
|
parser.add_argument("--input", help="Input text for --job")
|
|
parser.add_argument("--max-new-tokens", type=int)
|
|
args = parser.parse_args(argv)
|
|
|
|
if args.host != "127.0.0.1":
|
|
raise SystemExit("Refusing non-local bind without code change/explicit approval")
|
|
|
|
worker = NpuWorker(args.model_path, args.cache_dir, bind_host=args.host, bind_port=args.port)
|
|
if args.job:
|
|
result = worker.generate(args.job, args.input or "", args.max_new_tokens)
|
|
print(json.dumps(response_payload(worker, args.job, result), indent=2))
|
|
return 0 if result.npu_busy_delta_us > 0 else 2
|
|
|
|
if listener_exists(args.host, args.port):
|
|
raise SystemExit(f"Refusing to start: listener already exists on {args.host}:{args.port}")
|
|
server = ThreadingHTTPServer((args.host, args.port), make_handler(worker))
|
|
print(f"serving {MODEL_ID} on http://{args.host}:{args.port}; raw prompts are not logged")
|
|
server.serve_forever()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(cli())
|