#!/usr/bin/env python3 """Local-only document/image triage prototype. CPU stages: - local file intake, hashing, MIME/extension checks - image/PDF-page decoding and normalization - optional sidecar/native-text extraction - regex metadata extraction and rule-based category fallback NPU stages: - needs-attention semantic embedding via the existing local OpenVINO NPU embeddings service on 127.0.0.1:18817, verified by sysfs busy-time delta. No external uploads are performed. The only network call is localhost to the embedding service when enabled. """ from __future__ import annotations import argparse import base64 import dataclasses import datetime as dt import hashlib import io import json import mimetypes import os import re import sys import time import urllib.error import urllib.request from pathlib import Path from typing import Any try: from PIL import Image, ImageOps except Exception as exc: # pragma: no cover - caught in CLI smoke raise SystemExit("Pillow is required: install pillow in the active Python env") from exc NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") DEFAULT_EMBED_URL = "http://127.0.0.1:18817/v1/embeddings" DEFAULT_ALLOWED_ROOTS = [Path.cwd()] MAX_FILE_BYTES = 25 * 1024 * 1024 CATEGORY_LABELS = [ "receipt", "bill_or_invoice", "tax_or_financial", "medical_or_insurance", "legal_or_government", "form_or_application", "travel_or_ticket", "screenshot_conversation", "screenshot_web_or_app", "identity_or_sensitive", "photo_misc", "unknown_or_low_confidence", ] DATE_PATTERNS = [ re.compile(r"\b(20\d{2}[-/](?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01]))\b"), re.compile(r"\b((?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12]\d|3[01])[-/](?:20)?\d{2})\b"), re.compile(r"\b((?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]*\s+\d{1,2},?\s+20\d{2})\b", re.I), ] AMOUNT_RE = re.compile(r"(? int | None: try: return int(NPU_BUSY_PATH.read_text().strip()) except Exception: return None def sha256_file(path: Path) -> str: h = hashlib.sha256() with path.open("rb") as f: for chunk in iter(lambda: f.read(1024 * 1024), b""): h.update(chunk) return h.hexdigest() def under_allowed_root(path: Path, roots: list[Path]) -> bool: resolved = path.resolve() for root in roots: try: resolved.relative_to(root.resolve()) return True except ValueError: continue return False def sidecar_text(path: Path) -> tuple[str, str | None]: for suffix in (path.suffix + ".txt", ".txt"): candidate = path.with_suffix(suffix) if suffix.startswith(path.suffix) else path.with_suffix(suffix) if candidate.exists() and candidate.is_file(): try: return candidate.read_text(errors="replace")[:12000], f"sidecar:{candidate.name}" except Exception: return "", "sidecar_unreadable" return "", None def extract_pdf_text(path: Path, max_pages: int) -> tuple[str, str | None]: # Optional dependency; tests do not require it. Keeps PDF support local-only when installed. try: import pypdf # type: ignore except Exception: return "", "pypdf_not_installed" try: reader = pypdf.PdfReader(str(path)) if getattr(reader, "is_encrypted", False): return "", "pdf_encrypted" chunks = [] for page in reader.pages[:max_pages]: chunks.append(page.extract_text() or "") return "\n".join(chunks)[:12000], "pypdf_cpu" except Exception as exc: return "", f"pdf_text_error:{type(exc).__name__}" def load_image_pages(path: Path, max_pages: int) -> tuple[list[Image.Image], str | None]: ext = path.suffix.lower() if ext == ".pdf": try: import pypdfium2 as pdfium # type: ignore except Exception: return [], "pypdfium2_not_installed" try: pdf = pdfium.PdfDocument(str(path)) pages = [] for i in range(min(len(pdf), max_pages)): bitmap = pdf[i].render(scale=1.5) pages.append(bitmap.to_pil().convert("RGB")) return pages, None except Exception as exc: return [], f"pdf_render_error:{type(exc).__name__}" try: img = Image.open(path) img = ImageOps.exif_transpose(img).convert("RGB") return [img], None except Exception as exc: return [], f"image_decode_error:{type(exc).__name__}" def normalize_for_hash_features(img: Image.Image) -> dict[str, Any]: small = ImageOps.contain(img.copy(), (224, 224)) gray = small.convert("L") hist = gray.histogram() pixels = max(1, gray.width * gray.height) mean = sum(i * c for i, c in enumerate(hist)) / pixels variance = sum(((i - mean) ** 2) * c for i, c in enumerate(hist)) / pixels return { "mean_luma": round(mean, 2), "contrast": round(variance ** 0.5, 2), "aspect_ratio": round(img.width / max(1, img.height), 3), } def classify_rule(text: str, image_features: dict[str, Any]) -> dict[str, Any]: t = text.lower() best_label = "unknown_or_low_confidence" best_score = 0 for label, words in CATEGORY_KEYWORDS.items(): score = sum(1 for word in words if word in t) if score > best_score: best_label, best_score = label, score if best_score == 0: ar = image_features.get("aspect_ratio", 1.0) if ar > 1.3: best_label, best_score = "screenshot_web_or_app", 1 else: best_label, best_score = "unknown_or_low_confidence", 0 confidence = min(0.35 + 0.18 * best_score, 0.92) if best_score else 0.2 if confidence < 0.45: best_label = "unknown_or_low_confidence" return { "label": best_label, "confidence": round(confidence, 3), "device": "CPU", "stage": "category_classification", "method": "rule_based_fallback", "npu_status": "not_configured_for_prototype_v1", "candidate_labels": CATEGORY_LABELS, } def extract_metadata(text: str) -> dict[str, Any]: dates = [] for pat in DATE_PATTERNS: dates.extend(m.group(1) for m in pat.finditer(text)) amounts = AMOUNT_RE.findall(text) flags = { "org_present": bool(re.search(r"\b(?:inc|llc|clinic|department|bank|insurance|store)\b", text, re.I)), "address_present": bool(re.search(r"\b\d{2,5}\s+[A-Za-z0-9 .]+\s+(?:st|street|ave|avenue|rd|road|blvd|drive|dr)\b", text, re.I)), "phone_present": bool(PHONE_RE.search(text)), "email_present": bool(EMAIL_RE.search(text)), "policy_or_account_id_present": bool(ACCOUNT_RE.search(text)), "identity_number_like_present": bool(SSN_LIKE_RE.search(text)), } return { "dates_count": len(set(dates)), "amounts_count": len(set(amounts)), "detected_entities": flags, "raw_values_redacted": True, } def call_embeddings(text: str, url: str, timeout: float) -> dict[str, Any]: if not text.strip(): return {"used": False, "device": "NPU", "status": "skipped_no_text", "npu_busy_delta_us": 0} before = read_npu_busy() payload = json.dumps({"input": text[:2048], "purpose": "document"}).encode() req = urllib.request.Request(url, data=payload, headers={"Content-Type": "application/json"}) t0 = time.perf_counter() try: with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read(1024 * 1024) status = resp.status parsed = json.loads(body.decode()) dim = None if isinstance(parsed, dict) and parsed.get("data"): emb = parsed["data"][0].get("embedding", []) dim = len(emb) if isinstance(emb, list) else None after = read_npu_busy() delta = (after - before) if before is not None and after is not None else None return { "used": True, "device": "NPU", "status": "ok" if status == 200 else f"http_{status}", "embedding_dim": dim, "wall_ms": round((time.perf_counter() - t0) * 1000, 2), "npu_busy_delta_us": delta, "verified_npu": bool(delta and delta > 0), "endpoint": "127.0.0.1:18817", } except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: after = read_npu_busy() delta = (after - before) if before is not None and after is not None else None return { "used": False, "device": "NPU", "status": f"embedding_service_error:{type(exc).__name__}", "npu_busy_delta_us": delta, "verified_npu": False, "endpoint": "127.0.0.1:18817", } def needs_attention(text: str, embedding_result: dict[str, Any]) -> dict[str, Any]: t = text.lower() reasons = [] for reason, words in ATTENTION_KEYWORDS.items(): if any(word in t for word in words): reasons.append(reason) meta = extract_metadata(text) if meta["amounts_count"]: reasons.append("amount_due") if meta["dates_count"]: reasons.append("due_date_present") reasons = sorted(set(reasons)) value = bool(reasons) confidence = min(0.45 + 0.1 * len(reasons), 0.9) if value else 0.35 if embedding_result.get("verified_npu"): confidence = min(confidence + 0.05, 0.95) return { "value": value, "confidence": round(confidence, 3), "reasons": reasons or (["low_confidence"] if not text.strip() else []), "device": "NPU+CPU" if embedding_result.get("used") else "CPU", "stage": "needs_attention", "method": "NPU embedding verification + CPU rules" if embedding_result.get("used") else "CPU rules fallback", "embedding": embedding_result, } def infer_media_type(path: Path, is_pdf_page: bool = False) -> str: if is_pdf_page: return "pdf_page" mt, _ = mimetypes.guess_type(path.name) if path.suffix.lower() == ".pdf": return "pdf" if mt and mt.startswith("image/"): return "image" return "unknown" def triage_file(path_like: str | Path, options: TriageOptions | None = None) -> dict[str, Any]: options = options or TriageOptions() path = Path(path_like).expanduser() resolved = path.resolve() if not under_allowed_root(resolved, options.allowed_roots): raise ValueError(f"path is outside allowed roots: {path}") if not resolved.exists() or not resolved.is_file(): raise FileNotFoundError(str(path)) size = resolved.stat().st_size if size > MAX_FILE_BYTES: raise ValueError(f"file too large for prototype limit: {size} bytes") file_hash = sha256_file(resolved) text, text_source = sidecar_text(resolved) pdf_text_status = None if resolved.suffix.lower() == ".pdf" and not text: text, pdf_text_status = extract_pdf_text(resolved, options.max_pages) text_source = pdf_text_status pages: list[dict[str, Any]] = [] render_error = None if not options.dry_run: images, render_error = load_image_pages(resolved, options.max_pages) else: images = [] if not images and options.dry_run: images = [] elif not images: # Return a file-level record even if PDF rendering is unavailable. images = [] embedding_result = call_embeddings(text, options.embedding_url, options.timeout_seconds) if options.use_embeddings else {"used": False, "device": "NPU", "status": "disabled", "npu_busy_delta_us": 0, "verified_npu": False} attn = needs_attention(text, embedding_result) meta = extract_metadata(text) if images: for idx, img in enumerate(images): features = normalize_for_hash_features(img) classification = classify_rule(text, features) pages.append({ "page_index": idx, "media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"), "image": {"width": img.width, "height": img.height, "orientation": "portrait" if img.height >= img.width else "landscape", **features}, "classification": classification, "needs_attention": attn, "metadata": meta, "ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source}, }) else: classification = classify_rule(text, {"aspect_ratio": 1.0}) pages.append({ "page_index": 0, "media_type": infer_media_type(resolved, resolved.suffix.lower() == ".pdf"), "image": {"width": None, "height": None, "orientation": None, "render_error": render_error}, "classification": classification, "needs_attention": attn, "metadata": meta, "ocr": {"available": bool(text), "quality": 0.7 if text else 0.0, "device": "CPU", "text_source": text_source}, }) result: dict[str, Any] = { "file_id": f"sha256:{file_hash}", "source_path_basename": resolved.name, "media_type": infer_media_type(resolved), "file_size_bytes": size, "page_count": len(pages), "pages": pages, "processing_device_summary": { "file_intake": "CPU", "pdf_rendering": "CPU" if resolved.suffix.lower() == ".pdf" else "not_applicable", "image_category_classification": "CPU rule fallback (NPU model not configured in prototype v1)", "ocr_text_extraction": "CPU/local sidecar or optional local PDF text extractor", "needs_attention_embedding": "NPU via local :18817" if embedding_result.get("used") else "CPU fallback/no text", "metadata_extraction": "CPU", "npu_verified": bool(embedding_result.get("verified_npu")), "npu_busy_delta_us": embedding_result.get("npu_busy_delta_us"), }, "privacy": { "external_uploads": False, "localhost_only_embedding_call": bool(options.use_embeddings), "raw_text_logged": False, "raw_values_redacted": True, "full_path_included": options.include_full_path, }, "errors": [e for e in [render_error, pdf_text_status if pdf_text_status and not text else None] if e], } if options.include_full_path: result["source_path"] = str(resolved) if options.include_ocr_text: result["ocr_text"] = text return result def triage_batch(paths: list[str], options: TriageOptions | None = None) -> dict[str, Any]: items = [] for p in paths: try: items.append({"ok": True, "result": triage_file(p, options)}) except Exception as exc: items.append({"ok": False, "source_path_basename": Path(p).name, "error": type(exc).__name__, "message": str(exc)}) return {"ok": all(item["ok"] for item in items), "files": items, "generated_at": dt.datetime.now(dt.UTC).isoformat()} def cli() -> int: parser = argparse.ArgumentParser(description="Local document/image triage prototype") parser.add_argument("paths", nargs="+", help="local image/PDF paths") parser.add_argument("--allowed-root", action="append", default=[], help="allowed local root; defaults to cwd") parser.add_argument("--max-pages", type=int, default=3) parser.add_argument("--include-ocr-text", action="store_true") parser.add_argument("--include-full-path", action="store_true") parser.add_argument("--no-embeddings", action="store_true", help="disable local NPU embedding call") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--pretty", action="store_true") args = parser.parse_args() roots = [Path(p) for p in args.allowed_root] if args.allowed_root else [Path.cwd()] options = TriageOptions( max_pages=args.max_pages, include_ocr_text=args.include_ocr_text, dry_run=args.dry_run, use_embeddings=not args.no_embeddings, allowed_roots=roots, include_full_path=args.include_full_path, ) out = triage_batch(args.paths, options) print(json.dumps(out, indent=2 if args.pretty else None, sort_keys=True)) return 0 if out["ok"] else 2 if __name__ == "__main__": raise SystemExit(cli())