feat: add OpenVINO NPU prototype services

This commit is contained in:
William Valentin
2026-06-04 11:41:55 -07:00
parent d67c259187
commit 5b01b1bd11
26 changed files with 2165 additions and 4 deletions
+159
View File
@@ -0,0 +1,159 @@
# OpenVINO NPU document/image triage prototype
Local-only prototype for triaging screenshots, photos/scans, and PDF page images.
It returns structured JSON metadata and explicitly reports CPU vs NPU stages.
Location: `/home/will/lab/swarm/openvino-doc-image-triage-npu/`
## Privacy and safety
- No external uploads.
- The only network call is optional localhost-only embeddings at `127.0.0.1:18817`.
- Raw OCR/sidecar text is redacted by default and is not logged.
- Full source paths are omitted by default; responses include basename and SHA-256.
- Allowed roots are enforced for CLI/server requests.
- This prototype does not mutate Obsidian, RAG, Chroma, vector collections, routing, or gateway services.
## CPU vs NPU stages
CPU:
- file intake, allowed-root checks, size checks, hashing
- image/PDF decoding/rendering and normalization
- optional local text extraction from sidecars or PDF text libraries
- regex metadata extraction and rule-based category fallback
- final needs-attention rules
NPU:
- needs-attention semantic embedding, via existing local OpenVINO embeddings service on `:18817`
- verified with `/sys/class/accel/accel0/device/npu_busy_time_us` before/after each embedding call
Not configured in v1:
- image category classifier on NPU. The JSON reports this as `CPU rule fallback (NPU model not configured in prototype v1)`. A future task can add a static-shape MobileNet/EfficientNet/ResNet OpenVINO IR model.
- OCR on NPU. OCR remains CPU/local plumbing in v1.
## Files
- `triage.py` — core library and CLI.
- `server.py` — stdlib HTTP server with `/healthz`, `/models`, `/triage`, `/triage/batch`.
- `make_samples.py` — creates synthetic non-private image/PDF samples.
- `tests/smoke_test.py` — end-to-end smoke test, including NPU busy-time verification when `:18817` is reachable.
- `samples/` — generated synthetic fixtures.
## Requirements
Use the existing NPU venv when available:
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python -m pip install pillow
```
`pillow` is already present in the discovered `/home/will/.venvs/npu`. Optional local PDF text/rendering improves PDF support:
```bash
/home/will/.venvs/npu/bin/python -m pip install pypdf pypdfium2
```
The smoke tests do not require external services except the existing localhost `:18817` embeddings service for positive NPU verification.
## CLI usage
Generate synthetic samples:
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python make_samples.py
```
Triage local files:
```bash
/home/will/.venvs/npu/bin/python triage.py \
--allowed-root /home/will/lab/swarm/openvino-doc-image-triage-npu \
--pretty \
samples/synthetic_invoice.png samples/synthetic_invoice.pdf
```
Disable the local NPU embeddings call if needed:
```bash
/home/will/.venvs/npu/bin/python triage.py --no-embeddings --allowed-root "$PWD" samples/synthetic_receipt.png
```
Include OCR/sidecar text in a single response only when explicitly requested:
```bash
/home/will/.venvs/npu/bin/python triage.py --include-ocr-text --allowed-root "$PWD" samples/synthetic_invoice.png
```
## HTTP usage
Check that port 18820 is free first:
```bash
ss -ltnp | grep ':18820\b' || true
```
Start local-only server:
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python server.py --host 127.0.0.1 --port 18820 --allowed-root "$PWD"
```
Call it:
```bash
curl -sS http://127.0.0.1:18820/healthz | jq
curl -sS http://127.0.0.1:18820/models | jq
curl -sS -X POST http://127.0.0.1:18820/triage \
-H 'Content-Type: application/json' \
-d '{"path":"/home/will/lab/swarm/openvino-doc-image-triage-npu/samples/synthetic_invoice.png","options":{"allowed_roots":["/home/will/lab/swarm/openvino-doc-image-triage-npu"]}}' | jq
```
## Smoke test
```bash
cd /home/will/lab/swarm/openvino-doc-image-triage-npu
/home/will/.venvs/npu/bin/python tests/smoke_test.py
```
Expected: JSON ending with `"ok": true`. If the embeddings service is up, the result should show positive NPU busy-time delta and each embedded page should report `verified_npu: true`.
## Example output shape
```json
{
"file_id": "sha256:...",
"source_path_basename": "synthetic_invoice.png",
"media_type": "image",
"page_count": 1,
"pages": [
{
"page_index": 0,
"classification": {
"label": "bill_or_invoice",
"confidence": 0.71,
"device": "CPU",
"method": "rule_based_fallback"
},
"needs_attention": {
"value": true,
"device": "NPU+CPU",
"reasons": ["amount_due", "due_date_present"],
"embedding": {"verified_npu": true, "npu_busy_delta_us": 12345}
},
"metadata": {"dates_count": 1, "amounts_count": 1, "raw_values_redacted": true},
"ocr": {"available": true, "device": "CPU"}
}
],
"processing_device_summary": {
"file_intake": "CPU",
"image_category_classification": "CPU rule fallback (NPU model not configured in prototype v1)",
"needs_attention_embedding": "NPU via local :18817",
"metadata_extraction": "CPU",
"npu_verified": true
},
"privacy": {"external_uploads": false, "raw_text_logged": false}
}
```
@@ -0,0 +1,69 @@
#!/usr/bin/env python3
from __future__ import annotations
from pathlib import Path
from PIL import Image, ImageDraw, ImageFilter
ROOT = Path(__file__).resolve().parent
SAMPLES = ROOT / "samples"
def make_doc(path: Path, lines: list[str], size=(900, 1200), rotate: int = 0, blur: bool = False) -> None:
img = Image.new("RGB", size, "white")
draw = ImageDraw.Draw(img)
y = 70
for line in lines:
draw.text((70, y), line, fill="black")
y += 55
draw.rectangle((55, 50, size[0] - 55, min(size[1] - 50, y + 30)), outline="gray", width=3)
if blur:
img = img.filter(ImageFilter.GaussianBlur(2.5))
if rotate:
img = img.rotate(rotate, expand=True, fillcolor="white")
img.save(path)
path.with_suffix(path.suffix + ".txt").write_text("\n".join(lines) + "\n")
def main() -> int:
SAMPLES.mkdir(exist_ok=True)
make_doc(SAMPLES / "synthetic_invoice.png", [
"ACME Utilities Invoice",
"Invoice No: INV-2026-0604",
"Amount Due: $123.45",
"Payment due 2026-06-30",
"Please submit payment by the due date.",
])
make_doc(SAMPLES / "synthetic_receipt.png", [
"Neighborhood Store Receipt",
"Subtotal $14.20",
"Tax $1.42",
"Total $15.62",
"Thank you for shopping",
], size=(720, 1100), rotate=3)
make_doc(SAMPLES / "synthetic_conversation.png", [
"Messages with Alex",
"Can you please respond by tomorrow?",
"Need signature on the form before Friday.",
], size=(1200, 750))
make_doc(SAMPLES / "synthetic_sensitive_form.png", [
"Sample Government Form - Fake Data",
"Applicant: Test Person",
"SSN: 123-45-6789",
"Signature required",
"Submit by Jan 15, 2027",
], blur=False)
make_doc(SAMPLES / "synthetic_blurry.png", [
"Low resolution blurred sample",
"No action required",
], size=(360, 250), blur=True)
# PIL can save a simple local PDF from a synthetic page. This is non-private.
pdf_img = Image.open(SAMPLES / "synthetic_invoice.png").convert("RGB")
pdf_img.save(SAMPLES / "synthetic_invoice.pdf", "PDF")
(SAMPLES / "synthetic_invoice.pdf.txt").write_text((SAMPLES / "synthetic_invoice.png.txt").read_text())
print(f"wrote samples under {SAMPLES}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Binary file not shown.

After

Width:  |  Height:  |  Size: 4.5 KiB

@@ -0,0 +1,2 @@
Low resolution blurred sample
No action required
Binary file not shown.

After

Width:  |  Height:  |  Size: 9.1 KiB

@@ -0,0 +1,3 @@
Messages with Alex
Can you please respond by tomorrow?
Need signature on the form before Friday.
@@ -0,0 +1,5 @@
ACME Utilities Invoice
Invoice No: INV-2026-0604
Amount Due: $123.45
Payment due 2026-06-30
Please submit payment by the due date.
Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

@@ -0,0 +1,5 @@
ACME Utilities Invoice
Invoice No: INV-2026-0604
Amount Due: $123.45
Payment due 2026-06-30
Please submit payment by the due date.
Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

@@ -0,0 +1,5 @@
Neighborhood Store Receipt
Subtotal $14.20
Tax $1.42
Total $15.62
Thank you for shopping
Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

@@ -0,0 +1,5 @@
Sample Government Form - Fake Data
Applicant: Test Person
SSN: 123-45-6789
Signature required
Submit by Jan 15, 2027
+178
View File
@@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""Stdlib localhost HTTP wrapper for the triage prototype.
Endpoints:
- GET /healthz
- GET /models
- POST /triage JSON: {"path":"/local/file", "options": {...}}
- POST /triage/batch JSON: {"paths":["/local/file"], "options": {...}}
The server binds to 127.0.0.1 by default and accepts only local file paths under
configured allowed roots. It never uploads document/image contents externally.
"""
from __future__ import annotations
import argparse
import json
import os
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from triage import DEFAULT_EMBED_URL, TriageOptions, read_npu_busy, triage_batch, triage_file
def _roots_within_configured(requested_roots: list[Any], configured_roots: list[Path]) -> list[Path]:
"""Return request roots only when they narrow the startup allowlist."""
narrowed: list[Path] = []
configured = [root.expanduser().resolve() for root in configured_roots]
for raw in requested_roots:
candidate = Path(str(raw)).expanduser().resolve()
if any(candidate == root or candidate.is_relative_to(root) for root in configured):
narrowed.append(candidate)
else:
raise ValueError("requested allowed_roots must be within configured allowed roots")
return narrowed
def _validated_embedding_url(raw_url: Any) -> str:
"""Allow only the configured local loopback embeddings service."""
url = str(raw_url)
parsed = urlparse(url)
host = parsed.hostname or ""
if (
parsed.scheme == "http"
and host in {"127.0.0.1", "localhost", "::1"}
and (parsed.port or 80) == 18817
and parsed.path == "/v1/embeddings"
and not parsed.username
and not parsed.password
):
return url
raise ValueError("embedding_url override must target the configured local loopback embeddings service")
def make_options(payload: dict[str, Any], default_roots: list[Path]) -> TriageOptions:
opts = payload.get("options") or {}
requested_roots = opts.get("allowed_roots", [])
if requested_roots:
if not isinstance(requested_roots, list):
raise ValueError("allowed_roots must be a list")
roots = _roots_within_configured(requested_roots, default_roots)
else:
roots = default_roots
embedding_url = DEFAULT_EMBED_URL
if "embedding_url" in opts:
embedding_url = _validated_embedding_url(opts["embedding_url"])
return TriageOptions(
max_pages=int(opts.get("max_pages", 3)),
include_ocr_text=bool(opts.get("include_ocr_text", False)),
dry_run=bool(opts.get("dry_run", False)),
use_embeddings=bool(opts.get("use_embeddings", True)),
embedding_url=embedding_url,
allowed_roots=roots,
include_full_path=bool(opts.get("include_full_path", False)),
)
class Handler(BaseHTTPRequestHandler):
server_version = "openvino-doc-image-triage-npu/0.1"
def _json(self, status: int, body: dict[str, Any]) -> None:
data = json.dumps(body, sort_keys=True).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(data)))
self.end_headers()
self.wfile.write(data)
def log_message(self, format: str, *args: Any) -> None:
# Do not log request bodies, OCR text, or file paths.
return
@property
def allowed_roots(self) -> list[Path]:
return self.server.allowed_roots # type: ignore[attr-defined]
def do_GET(self) -> None: # noqa: N802
if self.path in ("/", "/healthz", "/health"):
self._json(200, {
"ok": True,
"service": "openvino-doc-image-triage-npu",
"bind_policy": "localhost-default",
"npu_busy_time_us": read_npu_busy(),
"npu_busy_check_enabled": True,
"allowed_roots": [str(p) for p in self.allowed_roots],
"privacy": {"external_uploads": False, "raw_text_logged": False},
})
return
if self.path == "/models":
self._json(200, {
"models": [
{
"stage": "needs_attention_embedding",
"model": "bge-base-en-v1.5-int8-ov via local :18817",
"target_device": "NPU",
"verification": "sysfs npu_busy_time_us before/after embedding call",
},
{
"stage": "image_category_classification",
"model": "rule-based fallback in prototype v1",
"target_device": "CPU",
"npu_status": "not configured; future static-shape MobileNet/EfficientNet/ResNet OV IR",
},
{"stage": "ocr_text_extraction", "model": "optional local sidecar/PDF text", "target_device": "CPU"},
]
})
return
self._json(404, {"ok": False, "error": "not_found"})
def _read_payload(self) -> dict[str, Any]:
length = int(self.headers.get("Content-Length", "0"))
if length > 512 * 1024:
raise ValueError("request JSON too large")
raw = self.rfile.read(length)
if not raw:
return {}
return json.loads(raw.decode())
def do_POST(self) -> None: # noqa: N802
try:
payload = self._read_payload()
options = make_options(payload, self.allowed_roots)
if self.path == "/triage":
path = payload.get("path")
if not path:
self._json(400, {"ok": False, "error": "missing_path"})
return
self._json(200, {"ok": True, "result": triage_file(path, options)})
return
if self.path == "/triage/batch":
paths = payload.get("paths") or []
if not isinstance(paths, list) or not paths:
self._json(400, {"ok": False, "error": "missing_paths"})
return
self._json(200, triage_batch([str(p) for p in paths], options))
return
self._json(404, {"ok": False, "error": "not_found"})
except Exception as exc:
self._json(400, {"ok": False, "error": type(exc).__name__, "message": str(exc)})
def main() -> int:
parser = argparse.ArgumentParser(description="Local-only doc/image triage HTTP server")
parser.add_argument("--host", default=os.environ.get("DOC_IMAGE_TRIAGE_HOST", "127.0.0.1"))
parser.add_argument("--port", type=int, default=int(os.environ.get("DOC_IMAGE_TRIAGE_PORT", "18820")))
parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; may repeat")
args = parser.parse_args()
roots = [Path(p).expanduser().resolve() for p in args.allowed_root] or [Path.cwd().resolve()]
httpd = ThreadingHTTPServer((args.host, args.port), Handler)
httpd.allowed_roots = roots # type: ignore[attr-defined]
print(json.dumps({"service": "openvino-doc-image-triage-npu", "host": args.host, "port": args.port, "allowed_roots": [str(p) for p in roots]}), flush=True)
httpd.serve_forever()
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,127 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import subprocess
import sys
import tempfile
import time
import urllib.error
import urllib.request
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SAMPLES = ROOT / "samples"
BUSY = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
def run(cmd: list[str]) -> None:
print("+", " ".join(cmd))
subprocess.run(cmd, cwd=ROOT, check=True)
def post_json(url: str, payload: dict) -> dict:
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode())
def post_json_status(url: str, payload: dict) -> tuple[int, dict]:
req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return resp.status, json.loads(resp.read().decode())
except urllib.error.HTTPError as exc:
return exc.code, json.loads(exc.read().decode())
def busy() -> int | None:
try:
return int(BUSY.read_text().strip())
except Exception:
return None
def main() -> int:
run([sys.executable, "make_samples.py"])
invoice = SAMPLES / "synthetic_invoice.png"
pdf = SAMPLES / "synthetic_invoice.pdf"
before = busy()
raw = subprocess.check_output([
sys.executable, "triage.py", "--allowed-root", str(ROOT), "--pretty", str(invoice), str(pdf)
], cwd=ROOT, text=True)
data = json.loads(raw)
assert data["ok"], data
first = data["files"][0]["result"]
assert first["privacy"]["external_uploads"] is False
assert first["pages"][0]["classification"]["label"] == "bill_or_invoice"
assert first["pages"][0]["needs_attention"]["value"] is True
assert "amount_due" in first["pages"][0]["needs_attention"]["reasons"]
assert first["processing_device_summary"]["file_intake"] == "CPU"
assert "NPU" in first["processing_device_summary"]["needs_attention_embedding"] or first["pages"][0]["needs_attention"]["device"] == "CPU"
after = busy()
if before is not None and after is not None:
# If :18817 is reachable and text was embedded, NPU delta must be positive.
emb = first["pages"][0]["needs_attention"]["embedding"]
if emb.get("used"):
assert emb.get("verified_npu") is True, emb
assert (emb.get("npu_busy_delta_us") or 0) > 0, emb
assert after > before, {"before": before, "after": after, "embedding": emb}
# HTTP smoke on an ephemeral localhost port so we do not collide with 18820 during tests.
proc = subprocess.Popen([sys.executable, "server.py", "--host", "127.0.0.1", "--port", "18828", "--allowed-root", str(ROOT)], cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
try:
deadline = time.time() + 5
while time.time() < deadline:
try:
health = urllib.request.urlopen("http://127.0.0.1:18828/healthz", timeout=1).read()
assert b"openvino-doc-image-triage-npu" in health
break
except Exception:
time.sleep(0.1)
else:
raise AssertionError("server did not become ready")
resp = post_json("http://127.0.0.1:18828/triage", {"path": str(invoice), "options": {"allowed_roots": [str(ROOT)]}})
assert resp["ok"] is True, resp
assert resp["result"]["source_path_basename"] == "synthetic_invoice.png"
assert "source_path" not in resp["result"]
# Request bodies may narrow but must not widen the startup --allowed-root policy.
with tempfile.NamedTemporaryFile(suffix=".txt") as outside:
outside.write(b"sensitive text outside configured artifact root")
outside.flush()
status, blocked = post_json_status(
"http://127.0.0.1:18828/triage",
{"path": outside.name, "options": {"allowed_roots": ["/tmp"], "dry_run": True, "use_embeddings": False}},
)
assert status == 400, blocked
assert blocked["ok"] is False, blocked
assert "allowed_roots" in blocked.get("message", ""), blocked
# Request bodies must not redirect extracted text to caller-supplied endpoints.
status, blocked = post_json_status(
"http://127.0.0.1:18828/triage",
{"path": str(invoice), "options": {"embedding_url": "http://198.51.100.1:9/v1/embeddings"}},
)
assert status == 400, blocked
assert blocked["ok"] is False, blocked
assert "embedding_url" in blocked.get("message", ""), blocked
finally:
proc.terminate()
proc.wait(timeout=5)
print(json.dumps({
"ok": True,
"samples": len(list(SAMPLES.glob("synthetic_*"))),
"npu_busy_before": before,
"npu_busy_after": after,
"npu_delta_observed": None if before is None or after is None else after - before,
"triage_label": first["pages"][0]["classification"]["label"],
"needs_attention": first["pages"][0]["needs_attention"]["value"],
}, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+459
View File
@@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""Local-only document/image triage prototype.
CPU stages:
- local file intake, hashing, MIME/extension checks
- image/PDF-page decoding and normalization
- optional sidecar/native-text extraction
- regex metadata extraction and rule-based category fallback
NPU stages:
- needs-attention semantic embedding via the existing local OpenVINO NPU
embeddings service on 127.0.0.1:18817, verified by sysfs busy-time delta.
No external uploads are performed. The only network call is localhost to the
embedding service when enabled.
"""
from __future__ import annotations
import argparse
import base64
import dataclasses
import datetime as dt
import hashlib
import io
import json
import mimetypes
import os
import re
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
try:
from PIL import Image, ImageOps
except Exception as exc: # pragma: no cover - caught in CLI smoke
raise SystemExit("Pillow is required: install pillow in the active Python env") from exc
NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings"
DEFAULT_ALLOWED_ROOTS = [Path.cwd()]
MAX_FILE_BYTES = 25 * 1024 * 1024
CATEGORY_LABELS = [
"receipt",
"bill_or_invoice",
"tax_or_financial",
"medical_or_insurance",
"legal_or_government",
"form_or_application",
"travel_or_ticket",
"screenshot_conversation",
"screenshot_web_or_app",
"identity_or_sensitive",
"photo_misc",
"unknown_or_low_confidence",
]
DATE_PATTERNS = [
re.compile(r"\b(20\d{2}[-/](?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01]))\b"),
re.compile(r"\b((?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01])[-/](?:20)?\d{2})\b"),
re.compile(r"\b((?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+20\d{2})\b", re.I),
]
AMOUNT_RE = re.compile(r"(?<!\w)(?:USD\s*)?\$\s?\d{1,4}(?:,\d{3})*(?:\.\d{2})?\b", re.I)
EMAIL_RE = re.compile(r"\b[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\b")
PHONE_RE = re.compile(r"\b(?:\+?1[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?){2}\d{4}\b")
ACCOUNT_RE = re.compile(r"\b(?:account|acct|policy|invoice|member|case|claim)\s*(?:#|no\.?|id)?\s*[:\-]?\s*[A-Z0-9-]{4,}\b", re.I)
SSN_LIKE_RE = re.compile(r"\b\d{3}-\d{2}-\d{4}\b")
ATTENTION_KEYWORDS = {
"due_date_present": ["due", "payment due", "pay by", "deadline"],
"amount_due": ["amount due", "balance due", "total due", "$"],
"action_required_language": ["action required", "please respond", "complete", "submit", "renew", "verify"],
"signature_required": ["signature", "sign and return", "signed"],
"appointment_or_deadline": ["appointment", "scheduled", "reservation", "hearing"],
"account_security": ["security", "password", "unauthorized", "fraud", "verify your account"],
"medical_followup": ["follow up", "lab result", "referral", "insurance"],
"tax_deadline": ["irs", "tax", "1099", "w-2", "deadline"],
}
CATEGORY_KEYWORDS = {
"receipt": ["receipt", "subtotal", "cashier", "change", "store"],
"bill_or_invoice": ["invoice", "amount due", "balance due", "statement", "payment due"],
"tax_or_financial": ["tax", "irs", "1099", "w-2", "bank", "routing"],
"medical_or_insurance": ["medical", "insurance", "clinic", "patient", "claim"],
"legal_or_government": ["court", "government", "department", "notice", "license"],
"form_or_application": ["application", "form", "signature", "submit"],
"travel_or_ticket": ["boarding", "ticket", "itinerary", "reservation", "gate"],
"screenshot_conversation": ["message", "chat", "reply", "conversation"],
"screenshot_web_or_app": ["login", "browser", "app", "settings", "dashboard"],
"identity_or_sensitive": ["ssn", "passport", "driver license", "social security"],
}
@dataclasses.dataclass
class TriageOptions:
max_pages: int = 3
include_ocr_text: bool = False
dry_run: bool = False
use_embeddings: bool = True
embedding_url: str = DEFAULT_EMBED_URL
allowed_roots: list[Path] = dataclasses.field(default_factory=lambda: DEFAULT_ALLOWED_ROOTS.copy())
include_full_path: bool = False
timeout_seconds: float = 10.0
def read_npu_busy() -> int | None:
try:
return int(NPU_BUSY_PATH.read_text().strip())
except Exception:
return None
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def under_allowed_root(path: Path, roots: list[Path]) -> bool:
resolved = path.resolve()
for root in roots:
try:
resolved.relative_to(root.resolve())
return True
except ValueError:
continue
return False
def sidecar_text(path: Path) -> tuple[str, str | None]:
for suffix in (path.suffix + ".txt", ".txt"):
candidate = path.with_suffix(suffix) if suffix.startswith(path.suffix) else path.with_suffix(suffix)
if candidate.exists() and candidate.is_file():
try:
return candidate.read_text(errors="replace")[:12000], f"sidecar:{candidate.name}"
except Exception:
return "", "sidecar_unreadable"
return "", None
def extract_pdf_text(path: Path, max_pages: int) -> tuple[str, str | None]:
# Optional dependency; tests do not require it. Keeps PDF support local-only when installed.
try:
import pypdf # type: ignore
except Exception:
return "", "pypdf_not_installed"
try:
reader = pypdf.PdfReader(str(path))
if getattr(reader, "is_encrypted", False):
return "", "pdf_encrypted"
chunks = []
for page in reader.pages[:max_pages]:
chunks.append(page.extract_text() or "")
return "\n".join(chunks)[:12000], "pypdf_cpu"
except Exception as exc:
return "", f"pdf_text_error:{type(exc).__name__}"
def load_image_pages(path: Path, max_pages: int) -> tuple[list[Image.Image], str | None]:
ext = path.suffix.lower()
if ext == ".pdf":
try:
import pypdfium2 as pdfium # type: ignore
except Exception:
return [], "pypdfium2_not_installed"
try:
pdf = pdfium.PdfDocument(str(path))
pages = []
for i in range(min(len(pdf), max_pages)):
bitmap = pdf[i].render(scale=1.5)
pages.append(bitmap.to_pil().convert("RGB"))
return pages, None
except Exception as exc:
return [], f"pdf_render_error:{type(exc).__name__}"
try:
img = Image.open(path)
img = ImageOps.exif_transpose(img).convert("RGB")
return [img], None
except Exception as exc:
return [], f"image_decode_error:{type(exc).__name__}"
def normalize_for_hash_features(img: Image.Image) -> dict[str, Any]:
small = ImageOps.contain(img.copy(), (224, 224))
gray = small.convert("L")
hist = gray.histogram()
pixels = max(1, gray.width * gray.height)
mean = sum(i * c for i, c in enumerate(hist)) / pixels
variance = sum(((i - mean) ** 2) * c for i, c in enumerate(hist)) / pixels
return {
"mean_luma": round(mean, 2),
"contrast": round(variance ** 0.5, 2),
"aspect_ratio": round(img.width / max(1, img.height), 3),
}
def classify_rule(text: str, image_features: dict[str, Any]) -> dict[str, Any]:
t = text.lower()
best_label = "unknown_or_low_confidence"
best_score = 0
for label, words in CATEGORY_KEYWORDS.items():
score = sum(1 for word in words if word in t)
if score > best_score:
best_label, best_score = label, score
if best_score == 0:
ar = image_features.get("aspect_ratio", 1.0)
if ar > 1.3:
best_label, best_score = "screenshot_web_or_app", 1
else:
best_label, best_score = "unknown_or_low_confidence", 0
confidence = min(0.35 + 0.18 * best_score, 0.92) if best_score else 0.2
if confidence < 0.45:
best_label = "unknown_or_low_confidence"
return {
"label": best_label,
"confidence": round(confidence, 3),
"device": "CPU",
"stage": "category_classification",
"method": "rule_based_fallback",
"npu_status": "not_configured_for_prototype_v1",
"candidate_labels": CATEGORY_LABELS,
}
def extract_metadata(text: str) -> dict[str, Any]:
dates = []
for pat in DATE_PATTERNS:
dates.extend(m.group(1) for m in pat.finditer(text))
amounts = AMOUNT_RE.findall(text)
flags = {
"org_present": bool(re.search(r"\b(?:inc|llc|clinic|department|bank|insurance|store)\b", text, re.I)),
"address_present": bool(re.search(r"\b\d{2,5}\s+[A-Za-z0-9 .]+\s+(?:st|street|ave|avenue|rd|road|blvd|drive|dr)\b", text, re.I)),
"phone_present": bool(PHONE_RE.search(text)),
"email_present": bool(EMAIL_RE.search(text)),
"policy_or_account_id_present": bool(ACCOUNT_RE.search(text)),
"identity_number_like_present": bool(SSN_LIKE_RE.search(text)),
}
return {
"dates_count": len(set(dates)),
"amounts_count": len(set(amounts)),
"detected_entities": flags,
"raw_values_redacted": True,
}
def call_embeddings(text: str, url: str, timeout: float) -> dict[str, Any]:
if not text.strip():
return {"used": False, "device": "NPU", "status": "skipped_no_text", "npu_busy_delta_us": 0}
before = read_npu_busy()
payload = json.dumps({"input": text[:2048], "purpose": "document"}).encode()
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"})
t0 = time.perf_counter()
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read(1024 * 1024)
status = resp.status
parsed = json.loads(body.decode())
dim = None
if isinstance(parsed, dict) and parsed.get("data"):
emb = parsed["data"][0].get("embedding", [])
dim = len(emb) if isinstance(emb, list) else None
after = read_npu_busy()
delta = (after - before) if before is not None and after is not None else None
return {
"used": True,
"device": "NPU",
"status": "ok" if status == 200 else f"http_{status}",
"embedding_dim": dim,
"wall_ms": round((time.perf_counter() - t0) * 1000, 2),
"npu_busy_delta_us": delta,
"verified_npu": bool(delta and delta > 0),
"endpoint": "127.0.0.1:18817",
}
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
after = read_npu_busy()
delta = (after - before) if before is not None and after is not None else None
return {
"used": False,
"device": "NPU",
"status": f"embedding_service_error:{type(exc).__name__}",
"npu_busy_delta_us": delta,
"verified_npu": False,
"endpoint": "127.0.0.1:18817",
}
def needs_attention(text: str, embedding_result: dict[str, Any]) -> dict[str, Any]:
t = text.lower()
reasons = []
for reason, words in ATTENTION_KEYWORDS.items():
if any(word in t for word in words):
reasons.append(reason)
meta = extract_metadata(text)
if meta["amounts_count"]:
reasons.append("amount_due")
if meta["dates_count"]:
reasons.append("due_date_present")
reasons = sorted(set(reasons))
value = bool(reasons)
confidence = min(0.45 + 0.1 * len(reasons), 0.9) if value else 0.35
if embedding_result.get("verified_npu"):
confidence = min(confidence + 0.05, 0.95)
return {
"value": value,
"confidence": round(confidence, 3),
"reasons": reasons or (["low_confidence"] if not text.strip() else []),
"device": "NPU+CPU" if embedding_result.get("used") else "CPU",
"stage": "needs_attention",
"method": "NPU embedding verification + CPU rules" if embedding_result.get("used") else "CPU rules fallback",
"embedding": embedding_result,
}
def infer_media_type(path: Path, is_pdf_page: bool = False) -> str:
if is_pdf_page:
return "pdf_page"
mt, _ = mimetypes.guess_type(path.name)
if path.suffix.lower() == ".pdf":
return "pdf"
if mt and mt.startswith("image/"):
return "image"
return "unknown"
def triage_file(path_like: str | Path, options: TriageOptions | None = None) -> dict[str, Any]:
options = options or TriageOptions()
path = Path(path_like).expanduser()
resolved = path.resolve()
if not under_allowed_root(resolved, options.allowed_roots):
raise ValueError(f"path is outside allowed roots: {path}")
if not resolved.exists() or not resolved.is_file():
raise FileNotFoundError(str(path))
size = resolved.stat().st_size
if size > MAX_FILE_BYTES:
raise ValueError(f"file too large for prototype limit: {size} bytes")
file_hash = sha256_file(resolved)
text, text_source = sidecar_text(resolved)
pdf_text_status = None
if resolved.suffix.lower() == ".pdf" and not text:
text, pdf_text_status = extract_pdf_text(resolved, options.max_pages)
text_source = pdf_text_status
pages: list[dict[str, Any]] = []
render_error = None
if not options.dry_run:
images, render_error = load_image_pages(resolved, options.max_pages)
else:
images = []
if not images and options.dry_run:
images = []
elif not images:
# Return a file-level record even if PDF rendering is unavailable.
images = []
embedding_result = call_embeddings(text, options.embedding_url, options.timeout_seconds) if options.use_embeddings else {"used": False, "device": "NPU", "status": "disabled", "npu_busy_delta_us": 0, "verified_npu": False}
attn = needs_attention(text, embedding_result)
meta = extract_metadata(text)
if images:
for idx, img in enumerate(images):
features = normalize_for_hash_features(img)
classification = classify_rule(text, features)
pages.append({
"page_index": idx,
"media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"),
"image": {"width": img.width, "height": img.height, "orientation": "portrait" if img.height >= img.width else "landscape", **features},
"classification": classification,
"needs_attention": attn,
"metadata": meta,
"ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source},
})
else:
classification = classify_rule(text, {"aspect_ratio": 1.0})
pages.append({
"page_index": 0,
"media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"),
"image": {"width": None, "height": None, "orientation": None, "render_error": render_error},
"classification": classification,
"needs_attention": attn,
"metadata": meta,
"ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source},
})
result: dict[str, Any] = {
"file_id": f"sha256:{file_hash}",
"source_path_basename": resolved.name,
"media_type": infer_media_type(resolved),
"file_size_bytes": size,
"page_count": len(pages),
"pages": pages,
"processing_device_summary": {
"file_intake": "CPU",
"pdf_rendering": "CPU" if resolved.suffix.lower() == ".pdf" else "not_applicable",
"image_category_classification": "CPU rule fallback (NPU model not configured in prototype v1)",
"ocr_text_extraction": "CPU/local sidecar or optional local PDF text extractor",
"needs_attention_embedding": "NPU via local :18817" if embedding_result.get("used") else "CPU fallback/no text",
"metadata_extraction": "CPU",
"npu_verified": bool(embedding_result.get("verified_npu")),
"npu_busy_delta_us": embedding_result.get("npu_busy_delta_us"),
},
"privacy": {
"external_uploads": False,
"localhost_only_embedding_call": bool(options.use_embeddings),
"raw_text_logged": False,
"raw_values_redacted": True,
"full_path_included": options.include_full_path,
},
"errors": [e for e in [render_error, pdf_text_status if pdf_text_status and not text else None] if e],
}
if options.include_full_path:
result["source_path"] = str(resolved)
if options.include_ocr_text:
result["ocr_text"] = text
return result
def triage_batch(paths: list[str], options: TriageOptions | None = None) -> dict[str, Any]:
items = []
for p in paths:
try:
items.append({"ok": True, "result": triage_file(p, options)})
except Exception as exc:
items.append({"ok": False, "source_path_basename": Path(p).name, "error": type(exc).__name__, "message": str(exc)})
return {"ok": all(item["ok"] for item in items), "files": items, "generated_at": dt.datetime.now(dt.UTC).isoformat()}
def cli() -> int:
parser = argparse.ArgumentParser(description="Local document/image triage prototype")
parser.add_argument("paths", nargs="+", help="local image/PDF paths")
parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; defaults to cwd")
parser.add_argument("--max-pages", type=int, default=3)
parser.add_argument("--include-ocr-text", action="store_true")
parser.add_argument("--include-full-path", action="store_true")
parser.add_argument("--no-embeddings", action="store_true", help="disable local NPU embedding call")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--pretty", action="store_true")
args = parser.parse_args()
roots = [Path(p) for p in args.allowed_root] if args.allowed_root else [Path.cwd()]
options = TriageOptions(
max_pages=args.max_pages,
include_ocr_text=args.include_ocr_text,
dry_run=args.dry_run,
use_embeddings=not args.no_embeddings,
allowed_roots=roots,
include_full_path=args.include_full_path,
)
out = triage_batch(args.paths, options)
print(json.dumps(out, indent=2 if args.pretty else None, sort_keys=True))
return 0 if out["ok"] else 2
if __name__ == "__main__":
raise SystemExit(cli())