Files
swarm-master/openvino-genai-npu-worker/tests/test_worker.py
2026-06-04 13:07:51 -07:00

132 lines
5.1 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
import pytest
import worker
class FakePipeline:
def __init__(self, model_path: str, device: str, config: dict[str, object], busy_path: Path, output: str = "Synthetic title"):
self.model_path = model_path
self.device = device
self.config = config
self.busy_path = busy_path
self.output = output
self.calls: list[tuple[str, int]] = []
def generate(self, prompt: str, *, max_new_tokens: int):
self.calls.append((prompt, max_new_tokens))
before = int(self.busy_path.read_text().strip())
self.busy_path.write_text(str(before + 1234))
return self.output
class FakeGenAI:
def __init__(self, busy_path: Path, output: str = "Synthetic title"):
self.busy_path = busy_path
self.output = output
self.pipeline: FakePipeline | None = None
def LLMPipeline(self, model_path: str, device: str, *args: object, **kwargs: object): # noqa: N802 - mirrors OpenVINO API
if args and isinstance(args[0], dict):
config: dict[str, object] = {str(k): v for k, v in args[0].items()}
else:
config = dict(kwargs)
self.pipeline = FakePipeline(model_path, device, config, self.busy_path, self.output)
return self.pipeline
@pytest.fixture()
def worker_paths(tmp_path: Path):
model_path = tmp_path / "model"
cache_dir = tmp_path / "cache"
busy_path = tmp_path / "npu_busy_time_us"
model_path.mkdir()
busy_path.write_text("100")
return model_path, cache_dir, busy_path
def test_generate_uses_npu_config_and_reports_busy_delta(monkeypatch: pytest.MonkeyPatch, worker_paths):
model_path, cache_dir, busy_path = worker_paths
fake_genai = FakeGenAI(busy_path)
monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai)
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_port=18820)
result = npu_worker.generate("title", "Synthetic non-private kanban notification.", max_new_tokens=24)
assert result.npu_busy_before_us == 100
assert result.npu_busy_after_us == 1334
assert result.npu_busy_delta_us == 1234
assert result.text == "Synthetic title"
assert fake_genai.pipeline is not None
assert fake_genai.pipeline.device == "NPU"
assert fake_genai.pipeline.config["CACHE_DIR"] == str(cache_dir)
assert fake_genai.pipeline.config["MAX_PROMPT_LEN"] == 1024
assert fake_genai.pipeline.calls[0][1] == 24
def test_memory_alias_json_wrapping(monkeypatch: pytest.MonkeyPatch, worker_paths):
model_path, cache_dir, busy_path = worker_paths
fake_genai = FakeGenAI(busy_path, output='[{"fact":"synthetic stable preference","confidence":0.8}]')
monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai)
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path)
result = npu_worker.generate("memory_candidate", "Synthetic user says they prefer concise answers.")
assert result.parsed_json is not None
assert result.parsed_json["candidates"][0]["fact"] == "synthetic stable preference"
assert "wrapped" in result.parsed_json["notes"]
@pytest.mark.parametrize(
("job", "user_input", "max_new_tokens", "message"),
[
("bad", "hello", 1, "unsupported job"),
("title", "", 1, "non-empty"),
("title", "x" * (worker.MAX_INPUT_CHARS + 1), 1, "input too long"),
("title", "hello", worker.MAX_NEW_TOKENS + 1, "max_new_tokens"),
],
)
def test_validation_errors(monkeypatch: pytest.MonkeyPatch, worker_paths, job: str, user_input: str, max_new_tokens: int, message: str):
model_path, cache_dir, busy_path = worker_paths
monkeypatch.setattr(worker, "import_openvino_genai", lambda: FakeGenAI(busy_path))
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path)
with pytest.raises(ValueError, match=message):
npu_worker.generate(job, user_input, max_new_tokens=max_new_tokens)
def test_health_reports_actual_bind_and_limits(worker_paths):
model_path, cache_dir, busy_path = worker_paths
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_host="127.0.0.1", bind_port=18821)
health = npu_worker.health()
assert health["bind"] == "127.0.0.1:18821"
assert health["max_input_chars"] == 6000
assert health["max_new_tokens"] == 256
assert health["busy_time_us"] == 100
def test_response_payload_shape(worker_paths):
model_path, cache_dir, busy_path = worker_paths
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path)
result = worker.GenerationResult(
text="ok",
parsed_json={"severity": "info"},
timing_ms={"load": 1.0, "initial_load": 1.0, "generate": 2.0, "total": 3.0},
npu_busy_delta_us=5,
npu_busy_before_us=10,
npu_busy_after_us=15,
)
payload = worker.response_payload(npu_worker, "notification", result)
assert json.dumps(payload)
assert payload["device"] == "NPU"
assert payload["job"] == "notification"
assert payload["json"] == {"severity": "info"}