#!/usr/bin/env python3 from __future__ import annotations import json import socket import subprocess import sys import tempfile import time import urllib.error import urllib.request from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SAMPLES = ROOT / "samples" BUSY = Path("/sys/class/accel/accel0/device/npu_busy_time_us") def run(cmd: list[str]) -> None: print("+", " ".join(cmd)) subprocess.run(cmd, cwd=ROOT, check=True) def post_json(url: str, payload: dict) -> dict: req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}) with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read().decode()) def post_json_status(url: str, payload: dict) -> tuple[int, dict]: req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}) try: with urllib.request.urlopen(req, timeout=10) as resp: return resp.status, json.loads(resp.read().decode()) except urllib.error.HTTPError as exc: return exc.code, json.loads(exc.read().decode()) def busy() -> int | None: try: return int(BUSY.read_text().strip()) except Exception: return None def choose_free_loopback_port() -> int: """Ask the OS for a free localhost port and verify it is not listening yet.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) port = int(sock.getsockname()[1]) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as probe: probe.settimeout(0.25) assert probe.connect_ex(("127.0.0.1", port)) != 0, f"selected port already has a listener: {port}" return port def assert_loopback_bind_policy() -> None: blocked = subprocess.run( [sys.executable, "server.py", "--host", "0.0.0.0", "--port", "0", "--allowed-root", str(ROOT)], cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) assert blocked.returncode != 0, blocked.stdout + blocked.stderr assert "loopback" in blocked.stderr.lower(), blocked.stderr def main() -> int: run([sys.executable, "make_samples.py"]) invoice = SAMPLES / "synthetic_invoice.png" pdf = SAMPLES / "synthetic_invoice.pdf" before = busy() raw = subprocess.check_output([ sys.executable, "triage.py", "--allowed-root", str(ROOT), "--pretty", str(invoice), str(pdf) ], cwd=ROOT, text=True) data = json.loads(raw) assert data["ok"], data first = data["files"][0]["result"] assert first["privacy"]["external_uploads"] is False assert first["pages"][0]["classification"]["label"] == "bill_or_invoice" assert first["pages"][0]["needs_attention"]["value"] is True assert "amount_due" in first["pages"][0]["needs_attention"]["reasons"] assert first["processing_device_summary"]["file_intake"] == "CPU" assert "NPU" in first["processing_device_summary"]["needs_attention_embedding"] or first["pages"][0]["needs_attention"]["device"] == "CPU" after = busy() if before is not None and after is not None: # If :18817 is reachable and text was embedded, NPU delta must be positive. emb = first["pages"][0]["needs_attention"]["embedding"] if emb.get("used"): assert emb.get("verified_npu") is True, emb assert (emb.get("npu_busy_delta_us") or 0) > 0, emb assert after > before, {"before": before, "after": after, "embedding": emb} # HTTP smoke on a preflighted free localhost port so we do not collide with live/prototype ports. assert_loopback_bind_policy() smoke_port = choose_free_loopback_port() base_url = f"http://127.0.0.1:{smoke_port}" proc = subprocess.Popen([sys.executable, "server.py", "--host", "127.0.0.1", "--port", str(smoke_port), "--allowed-root", str(ROOT)], cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) try: deadline = time.time() + 5 while time.time() < deadline: try: health = urllib.request.urlopen(f"{base_url}/healthz", timeout=1).read() assert b"openvino-doc-image-triage-npu" in health break except Exception: time.sleep(0.1) else: raise AssertionError("server did not become ready") resp = post_json(f"{base_url}/triage", {"path": str(invoice), "options": {"allowed_roots": [str(ROOT)]}}) assert resp["ok"] is True, resp assert resp["result"]["source_path_basename"] == "synthetic_invoice.png" assert "source_path" not in resp["result"] # Request bodies may narrow but must not widen the startup --allowed-root policy. with tempfile.NamedTemporaryFile(suffix=".txt") as outside: outside.write(b"sensitive text outside configured artifact root") outside.flush() status, blocked = post_json_status( f"{base_url}/triage", {"path": outside.name, "options": {"allowed_roots": ["/tmp"], "dry_run": True, "use_embeddings": False}}, ) assert status == 400, blocked assert blocked["ok"] is False, blocked assert "allowed_roots" in blocked.get("message", ""), blocked # Request bodies must not redirect extracted text to caller-supplied endpoints. status, blocked = post_json_status( f"{base_url}/triage", {"path": str(invoice), "options": {"embedding_url": "http://198.51.100.1:9/v1/embeddings"}}, ) assert status == 400, blocked assert blocked["ok"] is False, blocked assert "embedding_url" in blocked.get("message", ""), blocked finally: proc.terminate() proc.wait(timeout=5) print(json.dumps({ "ok": True, "samples": len(list(SAMPLES.glob("synthetic_*"))), "npu_busy_before": before, "npu_busy_after": after, "npu_delta_observed": None if before is None or after is None else after - before, "triage_label": first["pages"][0]["classification"]["label"], "needs_attention": first["pages"][0]["needs_attention"]["value"], }, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())