from __future__ import annotations import json from pathlib import Path import pytest import worker class FakePipeline: def __init__(self, model_path: str, device: str, config: dict[str, object], busy_path: Path, output: str = "Synthetic title"): self.model_path = model_path self.device = device self.config = config self.busy_path = busy_path self.output = output self.calls: list[tuple[str, int]] = [] def generate(self, prompt: str, *, max_new_tokens: int): self.calls.append((prompt, max_new_tokens)) before = int(self.busy_path.read_text().strip()) self.busy_path.write_text(str(before + 1234)) return self.output class FakeGenAI: def __init__(self, busy_path: Path, output: str = "Synthetic title"): self.busy_path = busy_path self.output = output self.pipeline: FakePipeline | None = None def LLMPipeline(self, model_path: str, device: str, *args: object, **kwargs: object): # noqa: N802 - mirrors OpenVINO API if args and isinstance(args[0], dict): config: dict[str, object] = {str(k): v for k, v in args[0].items()} else: config = dict(kwargs) self.pipeline = FakePipeline(model_path, device, config, self.busy_path, self.output) return self.pipeline @pytest.fixture() def worker_paths(tmp_path: Path): model_path = tmp_path / "model" cache_dir = tmp_path / "cache" busy_path = tmp_path / "npu_busy_time_us" model_path.mkdir() busy_path.write_text("100") return model_path, cache_dir, busy_path def test_generate_uses_npu_config_and_reports_busy_delta(monkeypatch: pytest.MonkeyPatch, worker_paths): model_path, cache_dir, busy_path = worker_paths fake_genai = FakeGenAI(busy_path) monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai) npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_port=18820) result = npu_worker.generate("title", "Synthetic non-private kanban notification.", max_new_tokens=24) assert result.npu_busy_before_us == 100 assert result.npu_busy_after_us == 1334 assert result.npu_busy_delta_us == 1234 assert result.text == "Synthetic title" assert fake_genai.pipeline is not None assert fake_genai.pipeline.device == "NPU" assert fake_genai.pipeline.config["CACHE_DIR"] == str(cache_dir) assert fake_genai.pipeline.config["MAX_PROMPT_LEN"] == 1024 assert fake_genai.pipeline.calls[0][1] == 24 def test_memory_alias_json_wrapping(monkeypatch: pytest.MonkeyPatch, worker_paths): model_path, cache_dir, busy_path = worker_paths fake_genai = FakeGenAI(busy_path, output='[{"fact":"synthetic stable preference","confidence":0.8}]') monkeypatch.setattr(worker, "import_openvino_genai", lambda: fake_genai) npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) result = npu_worker.generate("memory_candidate", "Synthetic user says they prefer concise answers.") assert result.parsed_json is not None assert result.parsed_json["candidates"][0]["fact"] == "synthetic stable preference" assert "wrapped" in result.parsed_json["notes"] @pytest.mark.parametrize( ("job", "user_input", "max_new_tokens", "message"), [ ("bad", "hello", 1, "unsupported job"), ("title", "", 1, "non-empty"), ("title", "x" * (worker.MAX_INPUT_CHARS + 1), 1, "input too long"), ("title", "hello", worker.MAX_NEW_TOKENS + 1, "max_new_tokens"), ], ) def test_validation_errors(monkeypatch: pytest.MonkeyPatch, worker_paths, job: str, user_input: str, max_new_tokens: int, message: str): model_path, cache_dir, busy_path = worker_paths monkeypatch.setattr(worker, "import_openvino_genai", lambda: FakeGenAI(busy_path)) npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) with pytest.raises(ValueError, match=message): npu_worker.generate(job, user_input, max_new_tokens=max_new_tokens) def test_health_reports_actual_bind_and_limits(worker_paths): model_path, cache_dir, busy_path = worker_paths npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path, bind_host="127.0.0.1", bind_port=18821) health = npu_worker.health() assert health["bind"] == "127.0.0.1:18821" assert health["max_input_chars"] == 6000 assert health["max_new_tokens"] == 256 assert health["busy_time_us"] == 100 def test_response_payload_shape(worker_paths): model_path, cache_dir, busy_path = worker_paths npu_worker = worker.NpuWorker(str(model_path), str(cache_dir), busy_path=busy_path) result = worker.GenerationResult( text="ok", parsed_json={"severity": "info"}, timing_ms={"load": 1.0, "initial_load": 1.0, "generate": 2.0, "total": 3.0}, npu_busy_delta_us=5, npu_busy_before_us=10, npu_busy_after_us=15, ) payload = worker.response_payload(npu_worker, "notification", result) assert json.dumps(payload) assert payload["device"] == "NPU" assert payload["job"] == "notification" assert payload["json"] == {"severity": "info"}