From b538a5a1f900d7417cd6bf711c169a8f5839652e Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 4 Jun 2026 12:13:44 -0700 Subject: [PATCH] feat: refresh OpenVINO GenAI NPU worker prototype --- openvino-genai-npu-worker/CONTRACT.md | 11 +- openvino-genai-npu-worker/README.md | 26 ++++ openvino-genai-npu-worker/pytest.ini | 2 + openvino-genai-npu-worker/smoke_llm_npu.py | 28 ++-- .../systemd/openvino-genai-npu-worker.service | 1 + .../tests/test_worker.py | 131 ++++++++++++++++++ openvino-genai-npu-worker/worker.py | 90 ++++++++---- 7 files changed, 253 insertions(+), 36 deletions(-) create mode 100644 openvino-genai-npu-worker/pytest.ini create mode 100644 openvino-genai-npu-worker/tests/test_worker.py diff --git a/openvino-genai-npu-worker/CONTRACT.md b/openvino-genai-npu-worker/CONTRACT.md index babebbb..604170c 100644 --- a/openvino-genai-npu-worker/CONTRACT.md +++ b/openvino-genai-npu-worker/CONTRACT.md @@ -1,6 +1,6 @@ # Bounded OpenVINO GenAI NPU worker contract -Status: proposed prototype contract; not a live Atlas/Hermes routing dependency. +Status: prototype contract implemented locally; not a live Atlas/Hermes routing dependency. Default address: `http://127.0.0.1:18820`. ## Purpose and hard boundary @@ -167,7 +167,7 @@ Validation/error behavior: - Unsupported path: `404` JSON `{"error":"not found"}`. - Unsupported job, empty input, too-long input, invalid token bound, missing model, or generation failure: JSON `{"error":"..."}` with non-2xx preferred for future implementations. The current stdlib prototype returns `400` for these errors. -- If `npu_busy_delta_us <= 0`, the response should be treated as failed by smoke tests even if an HTTP handler emitted `200`. +- If `npu_busy_delta_us <= 0`, the response should be treated as failed by smoke tests even if an HTTP handler emitted `200`; the refreshed prototype returns `503` with the generation payload plus an `error` field. ## Prompt/job contract @@ -253,6 +253,13 @@ Also verify the temporary listener is gone: ss -ltnp | grep ':18820' && { echo 'temporary smoke server still running'; exit 1; } || true ``` +Unit tests that do not load the model or require private data: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +python -m pytest -q +``` + ## NPU busy-time verification plan Acceptance for any NPU claim requires all of the following: diff --git a/openvino-genai-npu-worker/README.md b/openvino-genai-npu-worker/README.md index 1baf519..0cfda4a 100644 --- a/openvino-genai-npu-worker/README.md +++ b/openvino-genai-npu-worker/README.md @@ -18,6 +18,7 @@ The worker does not write memory, does not restart Atlas/Hermes, does not change - `CONTRACT.md` — bounded-worker service contract, endpoint/CLI API, smoke plan, NPU verification, docs implications, and no-go criteria. - `worker.py` — stdlib HTTP API plus CLI wrapper. - `smoke_llm_npu.py` — direct GenAI smoke test with NPU busy-time verification. +- `tests/test_worker.py` — unit tests with a fake GenAI pipeline and synthetic busy-time counter. - `systemd/openvino-genai-npu-worker.service` — optional user-service template; not installed by this prototype. ## Model/cache @@ -73,15 +74,20 @@ Observed cold-ish smoke after download/cache setup: --input 'Kanban task asks for a small OpenVINO GenAI NPU worker prototype.' ``` +Exit code is non-zero if validation fails, generation fails, or the worker-reported `npu_busy_delta_us` is not positive. + ## HTTP usage Start locally only: ```bash cd /home/will/lab/swarm/openvino-genai-npu-worker +ss -ltnp | grep ':18820' && { echo 'port 18820 already in use'; exit 1; } || true /home/will/.venvs/npu/bin/python worker.py --host 127.0.0.1 --port 18820 ``` +The server also refuses startup if a listener is already accepting connections on `127.0.0.1:18820`. + Endpoints: ```text @@ -103,6 +109,26 @@ curl -s http://127.0.0.1:18820/v1/worker/generate \ Response includes `npu_busy_delta_us`; treat zero as failure even if HTTP status is 200. +## Unit tests + +These tests use only synthetic strings and a fake GenAI pipeline, so they do not load the model or touch private data: + +```bash +cd /home/will/lab/swarm/openvino-genai-npu-worker +python -m pytest -q +``` + +## Environment variables + +```text +OV_GENAI_NPU_MODEL=/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov +OV_GENAI_NPU_CACHE=/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4 +OV_GENAI_NPU_HOST=127.0.0.1 +OV_GENAI_NPU_PORT=18820 +``` + +Only `127.0.0.1` is accepted by the current prototype; wider binds require an explicit code change and approval. + ## Safety boundaries - Binds only to `127.0.0.1` by default; non-local bind is refused in code. diff --git a/openvino-genai-npu-worker/pytest.ini b/openvino-genai-npu-worker/pytest.ini new file mode 100644 index 0000000..5ee6477 --- /dev/null +++ b/openvino-genai-npu-worker/pytest.ini @@ -0,0 +1,2 @@ +[pytest] +testpaths = tests diff --git a/openvino-genai-npu-worker/smoke_llm_npu.py b/openvino-genai-npu-worker/smoke_llm_npu.py index aba039f..533632c 100644 --- a/openvino-genai-npu-worker/smoke_llm_npu.py +++ b/openvino-genai-npu-worker/smoke_llm_npu.py @@ -10,31 +10,42 @@ import argparse import json import time from pathlib import Path - -import openvino_genai as ov_genai +from typing import Any DEFAULT_MODEL = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_CACHE = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4" BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") -def read_busy() -> int: - return int(BUSY_PATH.read_text().strip()) +def import_openvino_genai() -> Any: + import openvino_genai as ov_genai # type: ignore[import-not-found] + + return ov_genai + + +def read_busy(path: Path = BUSY_PATH) -> int: + return int(path.read_text().strip()) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--model", default=DEFAULT_MODEL) parser.add_argument("--cache-dir", default=DEFAULT_CACHE) - parser.add_argument("--prompt", default="Write a concise title for: User asked Atlas to summarize NPU worker options.") + parser.add_argument("--busy-path", default=str(BUSY_PATH)) + parser.add_argument("--prompt", default="Write a concise title for: Synthetic NPU worker contract smoke with no routing changes.") parser.add_argument("--max-new-tokens", type=int, default=24) args = parser.parse_args() model_path = Path(args.model) cache_dir = Path(args.cache_dir) + busy_path = Path(args.busy_path) cache_dir.mkdir(parents=True, exist_ok=True) if not model_path.exists(): raise SystemExit(f"model path does not exist: {model_path}") + if not busy_path.exists(): + raise SystemExit(f"NPU busy-time counter does not exist: {busy_path}") + if args.max_new_tokens < 1 or args.max_new_tokens > 256: + raise SystemExit("max-new-tokens must be between 1 and 256") config = { "CACHE_DIR": str(cache_dir), @@ -44,15 +55,16 @@ def main() -> int: "GENERATE_HINT": "FAST_COMPILE", } - before = read_busy() + ov_genai = import_openvino_genai() + before = read_busy(busy_path) load_start = time.monotonic() - pipe = ov_genai.LLMPipeline(str(model_path), "NPU", config) + pipe = ov_genai.LLMPipeline(str(model_path), "NPU", **config) load_ms = round((time.monotonic() - load_start) * 1000, 2) gen_start = time.monotonic() output = pipe.generate(args.prompt, max_new_tokens=args.max_new_tokens) gen_ms = round((time.monotonic() - gen_start) * 1000, 2) - after = read_busy() + after = read_busy(busy_path) result = { "model": str(model_path), "device": "NPU", diff --git a/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service b/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service index 910c940..e2b530f 100644 --- a/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service +++ b/openvino-genai-npu-worker/systemd/openvino-genai-npu-worker.service @@ -7,6 +7,7 @@ Type=simple WorkingDirectory=/home/will/lab/swarm/openvino-genai-npu-worker Environment=OV_GENAI_NPU_MODEL=/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov Environment=OV_GENAI_NPU_CACHE=/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4 +Environment=OV_GENAI_NPU_HOST=127.0.0.1 Environment=OV_GENAI_NPU_PORT=18820 ExecStart=/home/will/.venvs/npu/bin/python /home/will/lab/swarm/openvino-genai-npu-worker/worker.py --host 127.0.0.1 --port 18820 Restart=on-failure diff --git a/openvino-genai-npu-worker/tests/test_worker.py b/openvino-genai-npu-worker/tests/test_worker.py new file mode 100644 index 0000000..413ec09 --- /dev/null +++ b/openvino-genai-npu-worker/tests/test_worker.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +import worker + + +class FakePipeline: + def __init__(self, model_path: str, device: str, config: dict[str, object], busy_path: Path, output: str = "Synthetic title"): + self.model_path = model_path + self.device = device + self.config = config + self.busy_path = busy_path + self.output = output + self.calls: list[tuple[str, int]] = [] + + def generate(self, prompt: str, *, max_new_tokens: int): + self.calls.append((prompt, max_new_tokens)) + before = int(self.busy_path.read_text().strip()) + self.busy_path.write_text(str(before + 1234)) + return self.output + + +class FakeGenAI: + def __init__(self, busy_path: Path, output: str = "Synthetic title"): + self.busy_path = busy_path + self.output = output + self.pipeline: FakePipeline | None = None + + def LLMPipeline(self, model_path: str, device: str, *args: object, **kwargs: object): # noqa: N802 - mirrors OpenVINO API + if args and isinstance(args[0], dict): + config: dict[str, object] = {str(k): v for k, v in args[0].items()} + else: + config = dict(kwargs) + self.pipeline = FakePipeline(model_path, device, config, self.busy_path, self.output) + return self.pipeline + + +@pytest.fixture() +def worker_paths(tmp_path: Path): + model_path = tmp_path / "model" + cache_dir = tmp_path / "cache" + busy_path = tmp_path / "npu_busy_time_us" + model_path.mkdir() + busy_path.write_text("100") + return model_path, cache_dir, busy_path + + +def test_generate_uses_npu_config_and_reports_busy_delta(monkeypatch: pytest.MonkeyPatch, worker_paths): + model_path, cache_dir, busy_path = worker_paths + fake_genai = FakeGenAI(busy_path) + monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai) + + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_port=18820) + result = npu_worker.generate("title", "Synthetic non-private kanban notification.", max_new_tokens=24) + + assert result.npu_busy_before_us == 100 + assert result.npu_busy_after_us == 1334 + assert result.npu_busy_delta_us == 1234 + assert result.text == "Synthetic title" + assert fake_genai.pipeline is not None + assert fake_genai.pipeline.device == "NPU" + assert fake_genai.pipeline.config["CACHE_DIR"] == str(cache_dir) + assert fake_genai.pipeline.config["MAX_PROMPT_LEN"] == 1024 + assert fake_genai.pipeline.calls[0][1] == 24 + + +def test_memory_alias_json_wrapping(monkeypatch: pytest.MonkeyPatch, worker_paths): + model_path, cache_dir, busy_path = worker_paths + fake_genai = FakeGenAI(busy_path, output='[{"fact":"synthetic stable preference","confidence":0.8}]') + monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai) + + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) + result = npu_worker.generate("memory_candidate", "Synthetic user says they prefer concise answers.") + + assert result.parsed_json is not None + assert result.parsed_json["candidates"][0]["fact"] == "synthetic stable preference" + assert "wrapped" in result.parsed_json["notes"] + + +@pytest.mark.parametrize( + ("job", "user_input", "max_new_tokens", "message"), + [ + ("bad", "hello", 1, "unsupported job"), + ("title", "", 1, "non-empty"), + ("title", "x" * (worker.MAX_INPUT_CHARS + 1), 1, "input too long"), + ("title", "hello", worker.MAX_NEW_TOKENS + 1, "max_new_tokens"), + ], +) +def test_validation_errors(monkeypatch: pytest.MonkeyPatch, worker_paths, job: str, user_input: str, max_new_tokens: int, message: str): + model_path, cache_dir, busy_path = worker_paths + monkeypatch.setattr(worker, "import_openvino_genai", lambda: FakeGenAI(busy_path)) + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) + + with pytest.raises(ValueError, match=message): + npu_worker.generate(job, user_input, max_new_tokens=max_new_tokens) + + +def test_health_reports_actual_bind_and_limits(worker_paths): + model_path, cache_dir, busy_path = worker_paths + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_host="127.0.0.1", bind_port=18821) + + health = npu_worker.health() + + assert health["bind"] == "127.0.0.1:18821" + assert health["max_input_chars"] == 6000 + assert health["max_new_tokens"] == 256 + assert health["busy_time_us"] == 100 + + +def test_response_payload_shape(worker_paths): + model_path, cache_dir, busy_path = worker_paths + npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) + result = worker.GenerationResult( + text="ok", + parsed_json={"severity": "info"}, + timing_ms={"load": 1.0, "initial_load": 1.0, "generate": 2.0, "total": 3.0}, + npu_busy_delta_us=5, + npu_busy_before_us=10, + npu_busy_after_us=15, + ) + + payload = worker.response_payload(npu_worker, "notification", result) + + assert json.dumps(payload) + assert payload["device"] == "NPU" + assert payload["job"] == "notification" + assert payload["json"] == {"severity": "info"} diff --git a/openvino-genai-npu-worker/worker.py b/openvino-genai-npu-worker/worker.py index 9ec7ed8..2e11031 100644 --- a/openvino-genai-npu-worker/worker.py +++ b/openvino-genai-npu-worker/worker.py @@ -10,6 +10,7 @@ import argparse import json import os import re +import socket import threading import time from dataclasses import dataclass @@ -18,8 +19,6 @@ from pathlib import Path from typing import Any, cast from urllib.parse import urlparse -import openvino_genai as ov_genai # type: ignore[import-not-found] - MODEL_ID = "OpenVINO/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_MODEL_PATH = "/home/will/models/openvino-genai/Qwen2.5-1.5B-Instruct-int4-ov" DEFAULT_CACHE_DIR = "/home/will/.cache/openvino/genai-npu/qwen2.5-1.5b-int4" @@ -27,6 +26,14 @@ BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") HOST = "127.0.0.1" PORT = 18820 MAX_INPUT_CHARS = 6000 +MAX_NEW_TOKENS = 256 +GENAI_CONFIG = { + "CACHE_DIR": DEFAULT_CACHE_DIR, + "MAX_PROMPT_LEN": 1024, + "MIN_RESPONSE_LEN": 64, + "PREFILL_HINT": "DYNAMIC", + "GENERATE_HINT": "FAST_COMPILE", +} DEFAULTS = { "title": 32, "summary": 160, @@ -48,8 +55,20 @@ PROMPTS = { } -def read_busy() -> int: - return int(BUSY_PATH.read_text().strip()) +def import_openvino_genai() -> Any: + """Import OpenVINO GenAI lazily so unit tests do not require the NPU venv.""" + + import openvino_genai as ov_genai # type: ignore[import-not-found] + + return ov_genai + + +def listener_exists(host: str, port: int) -> bool: + """Return True when a TCP listener already accepts connections.""" + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(0.2) + return sock.connect_ex((host, port)) == 0 def coerce_json(text: str) -> Any | None: @@ -79,9 +98,20 @@ class GenerationResult: class NpuWorker: - def __init__(self, model_path: str, cache_dir: str): + def __init__( + self, + model_path: str, + cache_dir: str, + *, + busy_path: Path = BUSY_PATH, + bind_host: str = HOST, + bind_port: int = PORT, + ): self.model_path = Path(model_path) self.cache_dir = Path(cache_dir) + self.busy_path = Path(busy_path) + self.bind_host = bind_host + self.bind_port = bind_port self.cache_dir.mkdir(parents=True, exist_ok=True) self._pipe = None self._load_ms: float | None = None @@ -89,21 +119,20 @@ class NpuWorker: self._loaded_at: float | None = None if not self.model_path.exists(): raise FileNotFoundError(f"model path does not exist: {self.model_path}") + if not self.busy_path.exists(): + raise FileNotFoundError(f"NPU busy-time counter does not exist: {self.busy_path}") + + def read_busy(self) -> int: + return int(self.busy_path.read_text().strip()) def load(self) -> None: if self._pipe is not None: return start = time.monotonic() # NPU GenAI requires bounded prompt/response shapes; CACHE_DIR enables compiled blob caching. - self._pipe = ov_genai.LLMPipeline( - str(self.model_path), - "NPU", - CACHE_DIR=str(self.cache_dir), - MAX_PROMPT_LEN=1024, - MIN_RESPONSE_LEN=64, - PREFILL_HINT="DYNAMIC", - GENERATE_HINT="FAST_COMPILE", - ) + ov_genai = import_openvino_genai() + config = GENAI_CONFIG | {"CACHE_DIR": str(self.cache_dir)} + self._pipe = ov_genai.LLMPipeline(str(self.model_path), "NPU", **config) self._load_ms = round((time.monotonic() - start) * 1000, 2) self._loaded_at = time.time() @@ -115,19 +144,19 @@ class NpuWorker: if len(user_input) > MAX_INPUT_CHARS: raise ValueError(f"input too long: {len(user_input)} chars > {MAX_INPUT_CHARS}") max_new_tokens = int(max_new_tokens or DEFAULTS[job]) - if max_new_tokens < 1 or max_new_tokens > 256: - raise ValueError("max_new_tokens must be between 1 and 256") + if max_new_tokens < 1 or max_new_tokens > MAX_NEW_TOKENS: + raise ValueError(f"max_new_tokens must be between 1 and {MAX_NEW_TOKENS}") prompt = PROMPTS[job].format(input=user_input.strip()) with self._lock: load_start = time.monotonic() self.load() load_ms = round((time.monotonic() - load_start) * 1000, 2) - before = read_busy() + before = self.read_busy() gen_start = time.monotonic() pipe = cast(Any, self._pipe) text = str(pipe.generate(prompt, max_new_tokens=max_new_tokens)).strip() generate_ms = round((time.monotonic() - gen_start) * 1000, 2) - after = read_busy() + after = self.read_busy() parsed = coerce_json(text) if job in {"memory_candidate", "notification"} else None if job == "memory_candidate" and isinstance(parsed, list): parsed = {"candidates": parsed, "notes": "model returned a top-level array; worker wrapped it to preserve the API contract"} @@ -151,10 +180,11 @@ class NpuWorker: "loaded": self._pipe is not None, "initial_load_ms": self._load_ms, "loaded_at": self._loaded_at, - "busy_time_us": read_busy(), + "busy_time_us": self.read_busy(), "max_input_chars": MAX_INPUT_CHARS, + "max_new_tokens": MAX_NEW_TOKENS, "jobs": sorted(PROMPTS), - "bind": f"{HOST}:{PORT}", + "bind": f"{self.bind_host}:{self.bind_port}", } @@ -175,7 +205,7 @@ def response_payload(worker: NpuWorker, job: str, result: GenerationResult) -> d def make_handler(worker: NpuWorker): class Handler(BaseHTTPRequestHandler): - server_version = "openvino-genai-npu-worker/0.1" + server_version = "openvino-genai-npu-worker/0.2" def log_message(self, format: str, *args: Any) -> None: # Log only method/path/status metadata, not raw request bodies. @@ -215,7 +245,12 @@ def make_handler(worker: NpuWorker): if job == "memory": job = "memory_candidate" result = worker.generate(job, str(payload.get("input", "")), payload.get("max_new_tokens")) - self.send_json(200, response_payload(worker, job, result)) + body = response_payload(worker, job, result) + if result.npu_busy_delta_us <= 0: + body["error"] = "NPU busy-time counter did not increase during generation" + self.send_json(503, body) + return + self.send_json(200, body) except Exception as exc: self.send_json(400, {"error": str(exc)}) @@ -226,21 +261,24 @@ def cli(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="OpenVINO GenAI NPU worker") parser.add_argument("--model-path", default=os.environ.get("OV_GENAI_NPU_MODEL", DEFAULT_MODEL_PATH)) parser.add_argument("--cache-dir", default=os.environ.get("OV_GENAI_NPU_CACHE", DEFAULT_CACHE_DIR)) - parser.add_argument("--host", default=HOST) + parser.add_argument("--host", default=os.environ.get("OV_GENAI_NPU_HOST", HOST)) parser.add_argument("--port", type=int, default=int(os.environ.get("OV_GENAI_NPU_PORT", PORT))) parser.add_argument("--job", choices=sorted(PROMPTS), help="Run one CLI job instead of serving HTTP") parser.add_argument("--input", help="Input text for --job") parser.add_argument("--max-new-tokens", type=int) args = parser.parse_args(argv) - worker = NpuWorker(args.model_path, args.cache_dir) + if args.host != "127.0.0.1": + raise SystemExit("Refusing non-local bind without code change/explicit approval") + + worker = NpuWorker(args.model_path, args.cache_dir, bind_host=args.host, bind_port=args.port) if args.job: result = worker.generate(args.job, args.input or "", args.max_new_tokens) print(json.dumps(response_payload(worker, args.job, result), indent=2)) return 0 if result.npu_busy_delta_us > 0 else 2 - if args.host != "127.0.0.1": - raise SystemExit("Refusing non-local bind without code change/explicit approval") + if listener_exists(args.host, args.port): + raise SystemExit(f"Refusing to start: listener already exists on {args.host}:{args.port}") server = ThreadingHTTPServer((args.host, args.port), make_handler(worker)) print(f"serving {MODEL_ID} on http://{args.host}:{args.port}; raw prompts are not logged") server.serve_forever()