132 lines
5.1 KiB
Python
132 lines
5.1 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
import worker
|
|
|
|
|
|
class FakePipeline:
|
|
def __init__(self, model_path: str, device: str, config: dict[str, object], busy_path: Path, output: str = "Synthetic title"):
|
|
self.model_path = model_path
|
|
self.device = device
|
|
self.config = config
|
|
self.busy_path = busy_path
|
|
self.output = output
|
|
self.calls: list[tuple[str, int]] = []
|
|
|
|
def generate(self, prompt: str, *, max_new_tokens: int):
|
|
self.calls.append((prompt, max_new_tokens))
|
|
before = int(self.busy_path.read_text().strip())
|
|
self.busy_path.write_text(str(before + 1234))
|
|
return self.output
|
|
|
|
|
|
class FakeGenAI:
|
|
def __init__(self, busy_path: Path, output: str = "Synthetic title"):
|
|
self.busy_path = busy_path
|
|
self.output = output
|
|
self.pipeline: FakePipeline | None = None
|
|
|
|
def LLMPipeline(self, model_path: str, device: str, *args: object, **kwargs: object): # noqa: N802 - mirrors OpenVINO API
|
|
if args and isinstance(args[0], dict):
|
|
config: dict[str, object] = {str(k): v for k, v in args[0].items()}
|
|
else:
|
|
config = dict(kwargs)
|
|
self.pipeline = FakePipeline(model_path, device, config, self.busy_path, self.output)
|
|
return self.pipeline
|
|
|
|
|
|
@pytest.fixture()
|
|
def worker_paths(tmp_path: Path):
|
|
model_path = tmp_path / "model"
|
|
cache_dir = tmp_path / "cache"
|
|
busy_path = tmp_path / "npu_busy_time_us"
|
|
model_path.mkdir()
|
|
busy_path.write_text("100")
|
|
return model_path, cache_dir, busy_path
|
|
|
|
|
|
def test_generate_uses_npu_config_and_reports_busy_delta(monkeypatch: pytest.MonkeyPatch, worker_paths):
|
|
model_path, cache_dir, busy_path = worker_paths
|
|
fake_genai = FakeGenAI(busy_path)
|
|
monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai)
|
|
|
|
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_port=18820)
|
|
result = npu_worker.generate("title", "Synthetic non-private kanban notification.", max_new_tokens=24)
|
|
|
|
assert result.npu_busy_before_us == 100
|
|
assert result.npu_busy_after_us == 1334
|
|
assert result.npu_busy_delta_us == 1234
|
|
assert result.text == "Synthetic title"
|
|
assert fake_genai.pipeline is not None
|
|
assert fake_genai.pipeline.device == "NPU"
|
|
assert fake_genai.pipeline.config["CACHE_DIR"] == str(cache_dir)
|
|
assert fake_genai.pipeline.config["MAX_PROMPT_LEN"] == 1024
|
|
assert fake_genai.pipeline.calls[0][1] == 24
|
|
|
|
|
|
def test_memory_alias_json_wrapping(monkeypatch: pytest.MonkeyPatch, worker_paths):
|
|
model_path, cache_dir, busy_path = worker_paths
|
|
fake_genai = FakeGenAI(busy_path, output='[{"fact":"synthetic stable preference","confidence":0.8}]')
|
|
monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai)
|
|
|
|
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path)
|
|
result = npu_worker.generate("memory_candidate", "Synthetic user says they prefer concise answers.")
|
|
|
|
assert result.parsed_json is not None
|
|
assert result.parsed_json["candidates"][0]["fact"] == "synthetic stable preference"
|
|
assert "wrapped" in result.parsed_json["notes"]
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
("job", "user_input", "max_new_tokens", "message"),
|
|
[
|
|
("bad", "hello", 1, "unsupported job"),
|
|
("title", "", 1, "non-empty"),
|
|
("title", "x" * (worker.MAX_INPUT_CHARS + 1), 1, "input too long"),
|
|
("title", "hello", worker.MAX_NEW_TOKENS + 1, "max_new_tokens"),
|
|
],
|
|
)
|
|
def test_validation_errors(monkeypatch: pytest.MonkeyPatch, worker_paths, job: str, user_input: str, max_new_tokens: int, message: str):
|
|
model_path, cache_dir, busy_path = worker_paths
|
|
monkeypatch.setattr(worker, "import_openvino_genai", lambda: FakeGenAI(busy_path))
|
|
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path)
|
|
|
|
with pytest.raises(ValueError, match=message):
|
|
npu_worker.generate(job, user_input, max_new_tokens=max_new_tokens)
|
|
|
|
|
|
def test_health_reports_actual_bind_and_limits(worker_paths):
|
|
model_path, cache_dir, busy_path = worker_paths
|
|
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_host="127.0.0.1", bind_port=18821)
|
|
|
|
health = npu_worker.health()
|
|
|
|
assert health["bind"] == "127.0.0.1:18821"
|
|
assert health["max_input_chars"] == 6000
|
|
assert health["max_new_tokens"] == 256
|
|
assert health["busy_time_us"] == 100
|
|
|
|
|
|
def test_response_payload_shape(worker_paths):
|
|
model_path, cache_dir, busy_path = worker_paths
|
|
npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path)
|
|
result = worker.GenerationResult(
|
|
text="ok",
|
|
parsed_json={"severity": "info"},
|
|
timing_ms={"load": 1.0, "initial_load": 1.0, "generate": 2.0, "total": 3.0},
|
|
npu_busy_delta_us=5,
|
|
npu_busy_before_us=10,
|
|
npu_busy_after_us=15,
|
|
)
|
|
|
|
payload = worker.response_payload(npu_worker, "notification", result)
|
|
|
|
assert json.dumps(payload)
|
|
assert payload["device"] == "NPU"
|
|
assert payload["job"] == "notification"
|
|
assert payload["json"] == {"severity": "info"}
|