Files
swarm-master/openvino-doc-image-triage-npu/triage.py
T
2026-06-04 13:07:51 -07:00

460 lines
18 KiB
Python

#!/usr/bin/env python3
"""Local-only document/image triage prototype.
CPU stages:
- local file intake, hashing, MIME/extension checks
- image/PDF-page decoding and normalization
- optional sidecar/native-text extraction
- regex metadata extraction and rule-based category fallback
NPU stages:
- needs-attention semantic embedding via the existing local OpenVINO NPU
embeddings service on 127.0.0.1:18817, verified by sysfs busy-time delta.
No external uploads are performed. The only network call is localhost to the
embedding service when enabled.
"""
from __future__ import annotations
import argparse
import base64
import dataclasses
import datetime as dt
import hashlib
import io
import json
import mimetypes
import os
import re
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
try:
from PIL import Image, ImageOps
except Exception as exc: # pragma: no cover - caught in CLI smoke
raise SystemExit("Pillow is required: install pillow in the active Python env") from exc
NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings"
DEFAULT_ALLOWED_ROOTS = [Path.cwd()]
MAX_FILE_BYTES = 25 * 1024 * 1024
CATEGORY_LABELS = [
"receipt",
"bill_or_invoice",
"tax_or_financial",
"medical_or_insurance",
"legal_or_government",
"form_or_application",
"travel_or_ticket",
"screenshot_conversation",
"screenshot_web_or_app",
"identity_or_sensitive",
"photo_misc",
"unknown_or_low_confidence",
]
DATE_PATTERNS = [
re.compile(r"\b(20\d{2}[-/](?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01]))\b"),
re.compile(r"\b((?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01])[-/](?:20)?\d{2})\b"),
re.compile(r"\b((?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+20\d{2})\b", re.I),
]
AMOUNT_RE = re.compile(r"(?<!\w)(?:USD\s*)?\$\s?\d{1,4}(?:,\d{3})*(?:\.\d{2})?\b", re.I)
EMAIL_RE = re.compile(r"\b[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\b")
PHONE_RE = re.compile(r"\b(?:\+?1[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?){2}\d{4}\b")
ACCOUNT_RE = re.compile(r"\b(?:account|acct|policy|invoice|member|case|claim)\s*(?:#|no\.?|id)?\s*[:\-]?\s*[A-Z0-9-]{4,}\b", re.I)
SSN_LIKE_RE = re.compile(r"\b\d{3}-\d{2}-\d{4}\b")
ATTENTION_KEYWORDS = {
"due_date_present": ["due", "payment due", "pay by", "deadline"],
"amount_due": ["amount due", "balance due", "total due", "$"],
"action_required_language": ["action required", "please respond", "complete", "submit", "renew", "verify"],
"signature_required": ["signature", "sign and return", "signed"],
"appointment_or_deadline": ["appointment", "scheduled", "reservation", "hearing"],
"account_security": ["security", "password", "unauthorized", "fraud", "verify your account"],
"medical_followup": ["follow up", "lab result", "referral", "insurance"],
"tax_deadline": ["irs", "tax", "1099", "w-2", "deadline"],
}
CATEGORY_KEYWORDS = {
"receipt": ["receipt", "subtotal", "cashier", "change", "store"],
"bill_or_invoice": ["invoice", "amount due", "balance due", "statement", "payment due"],
"tax_or_financial": ["tax", "irs", "1099", "w-2", "bank", "routing"],
"medical_or_insurance": ["medical", "insurance", "clinic", "patient", "claim"],
"legal_or_government": ["court", "government", "department", "notice", "license"],
"form_or_application": ["application", "form", "signature", "submit"],
"travel_or_ticket": ["boarding", "ticket", "itinerary", "reservation", "gate"],
"screenshot_conversation": ["message", "chat", "reply", "conversation"],
"screenshot_web_or_app": ["login", "browser", "app", "settings", "dashboard"],
"identity_or_sensitive": ["ssn", "passport", "driver license", "social security"],
}
@dataclasses.dataclass
class TriageOptions:
max_pages: int = 3
include_ocr_text: bool = False
dry_run: bool = False
use_embeddings: bool = True
embedding_url: str = DEFAULT_EMBED_URL
allowed_roots: list[Path] = dataclasses.field(default_factory=lambda: DEFAULT_ALLOWED_ROOTS.copy())
include_full_path: bool = False
timeout_seconds: float = 10.0
def read_npu_busy() -> int | None:
try:
return int(NPU_BUSY_PATH.read_text().strip())
except Exception:
return None
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return h.hexdigest()
def under_allowed_root(path: Path, roots: list[Path]) -> bool:
resolved = path.resolve()
for root in roots:
try:
resolved.relative_to(root.resolve())
return True
except ValueError:
continue
return False
def sidecar_text(path: Path) -> tuple[str, str | None]:
for suffix in (path.suffix + ".txt", ".txt"):
candidate = path.with_suffix(suffix) if suffix.startswith(path.suffix) else path.with_suffix(suffix)
if candidate.exists() and candidate.is_file():
try:
return candidate.read_text(errors="replace")[:12000], f"sidecar:{candidate.name}"
except Exception:
return "", "sidecar_unreadable"
return "", None
def extract_pdf_text(path: Path, max_pages: int) -> tuple[str, str | None]:
# Optional dependency; tests do not require it. Keeps PDF support local-only when installed.
try:
import pypdf # type: ignore
except Exception:
return "", "pypdf_not_installed"
try:
reader = pypdf.PdfReader(str(path))
if getattr(reader, "is_encrypted", False):
return "", "pdf_encrypted"
chunks = []
for page in reader.pages[:max_pages]:
chunks.append(page.extract_text() or "")
return "\n".join(chunks)[:12000], "pypdf_cpu"
except Exception as exc:
return "", f"pdf_text_error:{type(exc).__name__}"
def load_image_pages(path: Path, max_pages: int) -> tuple[list[Image.Image], str | None]:
ext = path.suffix.lower()
if ext == ".pdf":
try:
import pypdfium2 as pdfium # type: ignore
except Exception:
return [], "pypdfium2_not_installed"
try:
pdf = pdfium.PdfDocument(str(path))
pages = []
for i in range(min(len(pdf), max_pages)):
bitmap = pdf[i].render(scale=1.5)
pages.append(bitmap.to_pil().convert("RGB"))
return pages, None
except Exception as exc:
return [], f"pdf_render_error:{type(exc).__name__}"
try:
img = Image.open(path)
img = ImageOps.exif_transpose(img).convert("RGB")
return [img], None
except Exception as exc:
return [], f"image_decode_error:{type(exc).__name__}"
def normalize_for_hash_features(img: Image.Image) -> dict[str, Any]:
small = ImageOps.contain(img.copy(), (224, 224))
gray = small.convert("L")
hist = gray.histogram()
pixels = max(1, gray.width * gray.height)
mean = sum(i * c for i, c in enumerate(hist)) / pixels
variance = sum(((i - mean) ** 2) * c for i, c in enumerate(hist)) / pixels
return {
"mean_luma": round(mean, 2),
"contrast": round(variance ** 0.5, 2),
"aspect_ratio": round(img.width / max(1, img.height), 3),
}
def classify_rule(text: str, image_features: dict[str, Any]) -> dict[str, Any]:
t = text.lower()
best_label = "unknown_or_low_confidence"
best_score = 0
for label, words in CATEGORY_KEYWORDS.items():
score = sum(1 for word in words if word in t)
if score > best_score:
best_label, best_score = label, score
if best_score == 0:
ar = image_features.get("aspect_ratio", 1.0)
if ar > 1.3:
best_label, best_score = "screenshot_web_or_app", 1
else:
best_label, best_score = "unknown_or_low_confidence", 0
confidence = min(0.35 + 0.18 * best_score, 0.92) if best_score else 0.2
if confidence < 0.45:
best_label = "unknown_or_low_confidence"
return {
"label": best_label,
"confidence": round(confidence, 3),
"device": "CPU",
"stage": "category_classification",
"method": "rule_based_fallback",
"npu_status": "not_configured_for_prototype_v1",
"candidate_labels": CATEGORY_LABELS,
}
def extract_metadata(text: str) -> dict[str, Any]:
dates = []
for pat in DATE_PATTERNS:
dates.extend(m.group(1) for m in pat.finditer(text))
amounts = AMOUNT_RE.findall(text)
flags = {
"org_present": bool(re.search(r"\b(?:inc|llc|clinic|department|bank|insurance|store)\b", text, re.I)),
"address_present": bool(re.search(r"\b\d{2,5}\s+[A-Za-z0-9 .]+\s+(?:st|street|ave|avenue|rd|road|blvd|drive|dr)\b", text, re.I)),
"phone_present": bool(PHONE_RE.search(text)),
"email_present": bool(EMAIL_RE.search(text)),
"policy_or_account_id_present": bool(ACCOUNT_RE.search(text)),
"identity_number_like_present": bool(SSN_LIKE_RE.search(text)),
}
return {
"dates_count": len(set(dates)),
"amounts_count": len(set(amounts)),
"detected_entities": flags,
"raw_values_redacted": True,
}
def call_embeddings(text: str, url: str, timeout: float) -> dict[str, Any]:
if not text.strip():
return {"used": False, "device": "NPU", "status": "skipped_no_text", "npu_busy_delta_us": 0}
before = read_npu_busy()
payload = json.dumps({"input": text[:2048], "purpose": "document"}).encode()
req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"})
t0 = time.perf_counter()
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read(1024 * 1024)
status = resp.status
parsed = json.loads(body.decode())
dim = None
if isinstance(parsed, dict) and parsed.get("data"):
emb = parsed["data"][0].get("embedding", [])
dim = len(emb) if isinstance(emb, list) else None
after = read_npu_busy()
delta = (after - before) if before is not None and after is not None else None
return {
"used": True,
"device": "NPU",
"status": "ok" if status == 200 else f"http_{status}",
"embedding_dim": dim,
"wall_ms": round((time.perf_counter() - t0) * 1000, 2),
"npu_busy_delta_us": delta,
"verified_npu": bool(delta and delta > 0),
"endpoint": "127.0.0.1:18817",
}
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
after = read_npu_busy()
delta = (after - before) if before is not None and after is not None else None
return {
"used": False,
"device": "NPU",
"status": f"embedding_service_error:{type(exc).__name__}",
"npu_busy_delta_us": delta,
"verified_npu": False,
"endpoint": "127.0.0.1:18817",
}
def needs_attention(text: str, embedding_result: dict[str, Any]) -> dict[str, Any]:
t = text.lower()
reasons = []
for reason, words in ATTENTION_KEYWORDS.items():
if any(word in t for word in words):
reasons.append(reason)
meta = extract_metadata(text)
if meta["amounts_count"]:
reasons.append("amount_due")
if meta["dates_count"]:
reasons.append("due_date_present")
reasons = sorted(set(reasons))
value = bool(reasons)
confidence = min(0.45 + 0.1 * len(reasons), 0.9) if value else 0.35
if embedding_result.get("verified_npu"):
confidence = min(confidence + 0.05, 0.95)
return {
"value": value,
"confidence": round(confidence, 3),
"reasons": reasons or (["low_confidence"] if not text.strip() else []),
"device": "NPU+CPU" if embedding_result.get("used") else "CPU",
"stage": "needs_attention",
"method": "NPU embedding verification + CPU rules" if embedding_result.get("used") else "CPU rules fallback",
"embedding": embedding_result,
}
def infer_media_type(path: Path, is_pdf_page: bool = False) -> str:
if is_pdf_page:
return "pdf_page"
mt, _ = mimetypes.guess_type(path.name)
if path.suffix.lower() == ".pdf":
return "pdf"
if mt and mt.startswith("image/"):
return "image"
return "unknown"
def triage_file(path_like: str | Path, options: TriageOptions | None = None) -> dict[str, Any]:
options = options or TriageOptions()
path = Path(path_like).expanduser()
resolved = path.resolve()
if not under_allowed_root(resolved, options.allowed_roots):
raise ValueError(f"path is outside allowed roots: {path}")
if not resolved.exists() or not resolved.is_file():
raise FileNotFoundError(str(path))
size = resolved.stat().st_size
if size > MAX_FILE_BYTES:
raise ValueError(f"file too large for prototype limit: {size} bytes")
file_hash = sha256_file(resolved)
text, text_source = sidecar_text(resolved)
pdf_text_status = None
if resolved.suffix.lower() == ".pdf" and not text:
text, pdf_text_status = extract_pdf_text(resolved, options.max_pages)
text_source = pdf_text_status
pages: list[dict[str, Any]] = []
render_error = None
if not options.dry_run:
images, render_error = load_image_pages(resolved, options.max_pages)
else:
images = []
if not images and options.dry_run:
images = []
elif not images:
# Return a file-level record even if PDF rendering is unavailable.
images = []
embedding_result = call_embeddings(text, options.embedding_url, options.timeout_seconds) if options.use_embeddings else {"used": False, "device": "NPU", "status": "disabled", "npu_busy_delta_us": 0, "verified_npu": False}
attn = needs_attention(text, embedding_result)
meta = extract_metadata(text)
if images:
for idx, img in enumerate(images):
features = normalize_for_hash_features(img)
classification = classify_rule(text, features)
pages.append({
"page_index": idx,
"media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"),
"image": {"width": img.width, "height": img.height, "orientation": "portrait" if img.height >= img.width else "landscape", **features},
"classification": classification,
"needs_attention": attn,
"metadata": meta,
"ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source},
})
else:
classification = classify_rule(text, {"aspect_ratio": 1.0})
pages.append({
"page_index": 0,
"media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"),
"image": {"width": None, "height": None, "orientation": None, "render_error": render_error},
"classification": classification,
"needs_attention": attn,
"metadata": meta,
"ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source},
})
result: dict[str, Any] = {
"file_id": f"sha256:{file_hash}",
"source_path_basename": resolved.name,
"media_type": infer_media_type(resolved),
"file_size_bytes": size,
"page_count": len(pages),
"pages": pages,
"processing_device_summary": {
"file_intake": "CPU",
"pdf_rendering": "CPU" if resolved.suffix.lower() == ".pdf" else "not_applicable",
"image_category_classification": "CPU rule fallback (NPU model not configured in prototype v1)",
"ocr_text_extraction": "CPU/local sidecar or optional local PDF text extractor",
"needs_attention_embedding": "NPU via local :18817" if embedding_result.get("used") else "CPU fallback/no text",
"metadata_extraction": "CPU",
"npu_verified": bool(embedding_result.get("verified_npu")),
"npu_busy_delta_us": embedding_result.get("npu_busy_delta_us"),
},
"privacy": {
"external_uploads": False,
"localhost_only_embedding_call": bool(options.use_embeddings),
"raw_text_logged": False,
"raw_values_redacted": True,
"full_path_included": options.include_full_path,
},
"errors": [e for e in [render_error, pdf_text_status if pdf_text_status and not text else None] if e],
}
if options.include_full_path:
result["source_path"] = str(resolved)
if options.include_ocr_text:
result["ocr_text"] = text
return result
def triage_batch(paths: list[str], options: TriageOptions | None = None) -> dict[str, Any]:
items = []
for p in paths:
try:
items.append({"ok": True, "result": triage_file(p, options)})
except Exception as exc:
items.append({"ok": False, "source_path_basename": Path(p).name, "error": type(exc).__name__, "message": str(exc)})
return {"ok": all(item["ok"] for item in items), "files": items, "generated_at": dt.datetime.now(dt.UTC).isoformat()}
def cli() -> int:
parser = argparse.ArgumentParser(description="Local document/image triage prototype")
parser.add_argument("paths", nargs="+", help="local image/PDF paths")
parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; defaults to cwd")
parser.add_argument("--max-pages", type=int, default=3)
parser.add_argument("--include-ocr-text", action="store_true")
parser.add_argument("--include-full-path", action="store_true")
parser.add_argument("--no-embeddings", action="store_true", help="disable local NPU embedding call")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--pretty", action="store_true")
args = parser.parse_args()
roots = [Path(p) for p in args.allowed_root] if args.allowed_root else [Path.cwd()]
options = TriageOptions(
max_pages=args.max_pages,
include_ocr_text=args.include_ocr_text,
dry_run=args.dry_run,
use_embeddings=not args.no_embeddings,
allowed_roots=roots,
include_full_path=args.include_full_path,
)
out = triage_batch(args.paths, options)
print(json.dumps(out, indent=2 if args.pretty else None, sort_keys=True))
return 0 if out["ok"] else 2
if __name__ == "__main__":
raise SystemExit(cli())