Files
2026-06-04 13:07:51 -07:00

155 lines
6.2 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import json
import socket
import subprocess
import sys
import tempfile
import time
import urllib.error
import urllib.request
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SAMPLES = ROOT / "samples"
BUSY = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
def run(cmd: list[str]) -> None:
print("+", " ".join(cmd))
subprocess.run(cmd, cwd=ROOT, check=True)
def post_json(url: str, payload: dict) -> dict:
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
def post_json_status(url: str, payload: dict) -> tuple[int, dict]:
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.status, json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
return exc.code, json.loads(exc.read().decode())
def busy() -> int | None:
try:
return int(BUSY.read_text().strip())
except Exception:
return None
def choose_free_loopback_port() -> int:
"""Ask the OS for a free localhost port and verify it is not listening yet."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
port = int(sock.getsockname()[1])
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as probe:
probe.settimeout(0.25)
assert probe.connect_ex(("127.0.0.1", port)) != 0, f"selected port already has a listener: {port}"
return port
def assert_loopback_bind_policy() -> None:
blocked = subprocess.run(
[sys.executable, "server.py", "--host", "0.0.0.0", "--port", "0", "--allowed-root", str(ROOT)],
cwd=ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
assert blocked.returncode != 0, blocked.stdout + blocked.stderr
assert "loopback" in blocked.stderr.lower(), blocked.stderr
def main() -> int:
run([sys.executable, "make_samples.py"])
invoice = SAMPLES / "synthetic_invoice.png"
pdf = SAMPLES / "synthetic_invoice.pdf"
before = busy()
raw = subprocess.check_output([
sys.executable, "triage.py", "--allowed-root", str(ROOT), "--pretty", str(invoice), str(pdf)
], cwd=ROOT, text=True)
data = json.loads(raw)
assert data["ok"], data
first = data["files"][0]["result"]
assert first["privacy"]["external_uploads"] is False
assert first["pages"][0]["classification"]["label"] == "bill_or_invoice"
assert first["pages"][0]["needs_attention"]["value"] is True
assert "amount_due" in first["pages"][0]["needs_attention"]["reasons"]
assert first["processing_device_summary"]["file_intake"] == "CPU"
assert "NPU" in first["processing_device_summary"]["needs_attention_embedding"] or first["pages"][0]["needs_attention"]["device"] == "CPU"
after = busy()
if before is not None and after is not None:
# If :18817 is reachable and text was embedded, NPU delta must be positive.
emb = first["pages"][0]["needs_attention"]["embedding"]
if emb.get("used"):
assert emb.get("verified_npu") is True, emb
assert (emb.get("npu_busy_delta_us") or 0) > 0, emb
assert after > before, {"before": before, "after": after, "embedding": emb}
# HTTP smoke on a preflighted free localhost port so we do not collide with live/prototype ports.
assert_loopback_bind_policy()
smoke_port = choose_free_loopback_port()
base_url = f"http://127.0.0.1:{smoke_port}"
proc = subprocess.Popen([sys.executable, "server.py", "--host", "127.0.0.1", "--port", str(smoke_port), "--allowed-root", str(ROOT)], cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
try:
deadline = time.time() + 5
while time.time() < deadline:
try:
health = urllib.request.urlopen(f"{base_url}/healthz", timeout=1).read()
assert b"openvino-doc-image-triage-npu" in health
break
except Exception:
time.sleep(0.1)
else:
raise AssertionError("server did not become ready")
resp = post_json(f"{base_url}/triage", {"path": str(invoice), "options": {"allowed_roots": [str(ROOT)]}})
assert resp["ok"] is True, resp
assert resp["result"]["source_path_basename"] == "synthetic_invoice.png"
assert "source_path" not in resp["result"]
# Request bodies may narrow but must not widen the startup --allowed-root policy.
with tempfile.NamedTemporaryFile(suffix=".txt") as outside:
outside.write(b"sensitive text outside configured artifact root")
outside.flush()
status, blocked = post_json_status(
f"{base_url}/triage",
{"path": outside.name, "options": {"allowed_roots": ["/tmp"], "dry_run": True, "use_embeddings": False}},
)
assert status == 400, blocked
assert blocked["ok"] is False, blocked
assert "allowed_roots" in blocked.get("message", ""), blocked
# Request bodies must not redirect extracted text to caller-supplied endpoints.
status, blocked = post_json_status(
f"{base_url}/triage",
{"path": str(invoice), "options": {"embedding_url": "http://198.51.100.1:9/v1/embeddings"}},
)
assert status == 400, blocked
assert blocked["ok"] is False, blocked
assert "embedding_url" in blocked.get("message", ""), blocked
finally:
proc.terminate()
proc.wait(timeout=5)
print(json.dumps({
"ok": True,
"samples": len(list(SAMPLES.glob("synthetic_*"))),
"npu_busy_before": before,
"npu_busy_after": after,
"npu_delta_observed": None if before is None or after is None else after - before,
"triage_label": first["pages"][0]["classification"]["label"],
"needs_attention": first["pages"][0]["needs_attention"]["value"],
}, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())