Files
swarm-master/scripts/npu-batch-triage-dry-run.py
T
2026-06-05 15:52:43 -07:00

524 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""Explicit-root dry-run batch triage for local documents, images, and audio.
This wrapper is intentionally report-only. It requires a lane-scoped approved
root in a manifest, rejects request roots that broaden that approval, redacts raw
text/transcripts by default, and never mutates Obsidian, RAG/vector DBs, files,
routing, memory, services, or sends.
"""
from __future__ import annotations
import argparse
import datetime as dt
import hashlib
import ipaddress
import importlib.util
import json
import mimetypes
import os
import re
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
try:
import yaml # type: ignore
except Exception as exc: # pragma: no cover
raise SystemExit("PyYAML is required to read triage root manifests") from exc
LANES = (
"screenshots",
"receipts",
"downloads",
"obsidian_attachments",
"voice_memos",
"meeting_snippets",
)
AUDIO_LANES = {"voice_memos", "meeting_snippets"}
DOC_IMAGE_LANES = {"screenshots", "receipts", "downloads", "obsidian_attachments"}
SKIP_DIR_NAMES = {".git", ".obsidian", "__pycache__", ".cache", "cache", "chroma", "chromadb", "vector_db", "vectors"}
NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us")
DEFAULT_WHISPER_URL = "http://127.0.0.1:18816/v1/audio/transcriptions"
MUTATIONS_FALSE = {
"obsidian": False,
"rag": False,
"vector_db": False,
"sends": False,
"file_moves": False,
"routing": False,
"memory": False,
"service_restarts": False,
}
ACTION_PATTERNS = {
"follow_up": re.compile(r"\b(follow up|follow-up|circle back|reply|respond)\b", re.I),
"date_or_deadline": re.compile(r"\b(deadline|due|by (?:mon|tue|wed|thu|fri|sat|sun)|20\d{2}[-/]\d{1,2}[-/]\d{1,2})\b", re.I),
"decision": re.compile(r"\b(decided|decision|approved|rejected|go with|choose)\b", re.I),
"task": re.compile(r"\b(todo|to-do|action item|assign|need to|please)\b", re.I),
}
class FailClosed(Exception):
pass
def sha256_text(text: str) -> str:
return "sha256:" + hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest()
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
h.update(chunk)
return "sha256:" + h.hexdigest()
def read_busy(path: Path = NPU_BUSY_PATH) -> int | None:
try:
return int(path.read_text().strip())
except Exception:
return None
def validate_local_whisper_url(whisper_url: str) -> str:
"""Fail closed unless Whisper transcription stays on the approved loopback service."""
try:
parsed = urllib.parse.urlparse(whisper_url)
port = parsed.port
except ValueError as exc:
raise FailClosed("whisper_url_invalid") from exc
if parsed.scheme != "http":
raise FailClosed("whisper_url_scheme_not_http")
if parsed.username or parsed.password:
raise FailClosed("whisper_url_credentials_not_allowed")
if port != 18816:
raise FailClosed("whisper_url_port_not_approved")
host = (parsed.hostname or "").strip().lower()
if host == "localhost":
return whisper_url
try:
if ipaddress.ip_address(host).is_loopback:
return whisper_url
except ValueError:
pass
raise FailClosed("whisper_url_not_loopback")
def is_under(path: Path, root: Path) -> bool:
try:
path.resolve().relative_to(root.resolve())
return True
except ValueError:
return False
def load_manifest(path: Path) -> dict[str, Any]:
if not path.exists():
raise FailClosed(f"manifest_missing:{path}")
data = yaml.safe_load(path.read_text())
if not isinstance(data, dict):
raise FailClosed("manifest_invalid:not_mapping")
if data.get("version") != 1:
raise FailClosed("manifest_invalid:version_must_be_1")
policy = data.get("policy") or {}
if policy.get("default_mode", "dry_run") != "dry_run":
raise FailClosed("policy_invalid:default_mode_not_dry_run")
for key, expected in {
"require_explicit_root": True,
"allow_external_uploads": False,
"allow_mutations": False,
"log_raw_text": False,
}.items():
if policy.get(key) is not expected:
raise FailClosed(f"policy_invalid:{key}")
if not isinstance(data.get("roots"), dict):
raise FailClosed("manifest_invalid:roots_missing")
return data
def resolve_lane_root(manifest: dict[str, Any], manifest_path: Path, lane: str, requested_root: str | None) -> tuple[dict[str, Any], Path, Path]:
lane_cfg = (manifest.get("roots") or {}).get(lane)
if not isinstance(lane_cfg, dict):
raise FailClosed(f"lane_missing:{lane}")
if lane_cfg.get("approved") is not True:
raise FailClosed(f"lane_unapproved:{lane}")
root_value = lane_cfg.get("root")
if not root_value:
raise FailClosed(f"root_missing:{lane}")
approved_root = Path(str(root_value)).expanduser()
if not approved_root.is_absolute():
approved_root = (manifest_path.parent / approved_root).resolve()
else:
approved_root = approved_root.resolve()
if not approved_root.exists() or not approved_root.is_dir():
raise FailClosed(f"approved_root_unavailable:{lane}")
selected_root = Path(requested_root).expanduser() if requested_root else approved_root
selected_root = selected_root.resolve()
if not selected_root.exists() or not selected_root.is_dir():
raise FailClosed(f"request_root_unavailable:{lane}")
if not is_under(selected_root, approved_root):
raise FailClosed(f"request_root_broadens_approval:{lane}")
return lane_cfg, approved_root, selected_root
def allowed_exts(lane_cfg: dict[str, Any]) -> set[str]:
return {str(e).lower() if str(e).startswith(".") else "." + str(e).lower() for e in lane_cfg.get("allowed_extensions", [])}
def iter_files(root: Path, approved_root: Path, exts: set[str], max_file_mb: float, max_age_days: float | None) -> tuple[list[Path], dict[str, int], int]:
skipped = {"extension": 0, "size": 0, "symlink_escape": 0, "not_regular_file": 0, "too_old": 0, "policy": 0}
accepted: list[Path] = []
files_seen = 0
now = time.time()
max_bytes = int(max_file_mb * 1024 * 1024)
for dirpath, dirnames, filenames in os.walk(root, followlinks=False):
dirnames[:] = [d for d in dirnames if d not in SKIP_DIR_NAMES and not d.startswith(".")]
current = Path(dirpath)
if not is_under(current, approved_root):
skipped["symlink_escape"] += 1
dirnames[:] = []
continue
for name in filenames:
path = current / name
if name.startswith("."):
skipped["policy"] += 1
continue
files_seen += 1
try:
resolved = path.resolve()
except Exception:
skipped["symlink_escape"] += 1
continue
if not is_under(resolved, approved_root):
skipped["symlink_escape"] += 1
continue
if not resolved.is_file():
skipped["not_regular_file"] += 1
continue
if resolved.suffix.lower() not in exts:
skipped["extension"] += 1
continue
try:
st = resolved.stat()
except OSError:
skipped["not_regular_file"] += 1
continue
if st.st_size > max_bytes:
skipped["size"] += 1
continue
if max_age_days is not None and now - st.st_mtime > max_age_days * 86400:
skipped["too_old"] += 1
continue
accepted.append(resolved)
accepted.sort(key=lambda p: p.stat().st_mtime, reverse=True)
return accepted, skipped, files_seen
def load_doc_triage_module(repo_root: Path):
module_path = repo_root / "openvino-doc-image-triage-npu" / "triage.py"
spec = importlib.util.spec_from_file_location("doc_image_triage", module_path)
if spec is None or spec.loader is None:
raise RuntimeError("doc_image_triage_import_failed")
module = importlib.util.module_from_spec(spec)
sys.modules.setdefault("doc_image_triage", module)
spec.loader.exec_module(module) # type: ignore[union-attr]
return module
def fallback_doc_item(path: Path, root: Path, lane: str) -> dict[str, Any]:
sidecar = path.with_suffix(path.suffix + ".txt")
text = ""
if sidecar.exists() and sidecar.is_file():
text = sidecar.read_text(errors="replace")[:12000]
lower = text.lower()
category = "unknown_or_low_confidence"
if any(w in lower for w in ("receipt", "subtotal", "store")):
category = "receipt"
elif any(w in lower for w in ("invoice", "amount due", "payment due")):
category = "bill_or_invoice"
elif lane == "screenshots":
category = "screenshot_web_or_app"
reasons = [name for name, rx in ACTION_PATTERNS.items() if rx.search(text)]
return {
"basename": path.name,
"relative_path_hash": sha256_text(path.relative_to(root).as_posix()),
"file_id": sha256_file(path),
"media_type": infer_media_type(path),
"category": category,
"needs_attention": bool(reasons),
"reasons": sorted(reasons),
"raw_text_redacted": True,
"full_path_included": False,
"metadata": {"dates_count": len(set(re.findall(r"\b20\d{2}[-/]\d{1,2}[-/]\d{1,2}\b", text))), "amounts_count": len(set(re.findall(r"\$\s?\d+(?:\.\d{2})?", text))), "raw_values_redacted": True},
"processing": {"doc_image_triage": "fallback_cpu_sidecar_rules", "npu_verified": False},
}
def infer_media_type(path: Path) -> str:
if path.suffix.lower() == ".pdf":
return "pdf"
mt, _ = mimetypes.guess_type(path.name)
if mt and mt.startswith("image/"):
return "image"
if mt and mt.startswith("audio/"):
return "audio"
return "unknown"
def compact_doc_item(path: Path, root: Path, lane: str, triage_result: dict[str, Any]) -> dict[str, Any]:
pages = triage_result.get("pages") or []
first = pages[0] if pages else {}
cls = first.get("classification") or {}
attn = first.get("needs_attention") or {}
meta = first.get("metadata") or {}
device_summary = triage_result.get("processing_device_summary") or {}
item = {
"basename": path.name,
"relative_path_hash": sha256_text(path.relative_to(root).as_posix()),
"file_id": triage_result.get("file_id") or sha256_file(path),
"media_type": triage_result.get("media_type") or infer_media_type(path),
"category": cls.get("label") or "unknown_or_low_confidence",
"needs_attention": bool(attn.get("value")),
"reasons": attn.get("reasons") or [],
"raw_text_redacted": True,
"full_path_included": False,
"metadata": {
"dates_count": meta.get("dates_count", 0),
"amounts_count": meta.get("amounts_count", 0),
"raw_values_redacted": True,
},
"processing": {
"doc_image_triage": "openvino-doc-image-triage-npu",
"image_category_device": (cls.get("device") or "CPU"),
"needs_attention_device": attn.get("device") or "CPU",
"npu_verified": bool(device_summary.get("npu_verified")),
"npu_busy_delta_us": device_summary.get("npu_busy_delta_us"),
},
}
if lane == "receipts":
item["receipt_fields"] = {"vendor_present": bool((meta.get("detected_entities") or {}).get("org_present")), "amounts_count": item["metadata"]["amounts_count"], "dates_count": item["metadata"]["dates_count"]}
return item
def classify_transcript(text: str, lane: str) -> dict[str, Any]:
reasons = [name for name, rx in ACTION_PATTERNS.items() if rx.search(text)]
action_count = sum(1 for rx in (ACTION_PATTERNS["follow_up"], ACTION_PATTERNS["task"]) if rx.search(text))
decisions = 1 if ACTION_PATTERNS["decision"].search(text) else 0
followups = 1 if ACTION_PATTERNS["follow_up"].search(text) else 0
return {
"category": "meeting_snippet" if lane == "meeting_snippets" else "voice_memo",
"action_worthy": bool(reasons),
"reasons": sorted(reasons),
"action_items_count": action_count,
"decisions_count": decisions,
"followups_count": followups,
}
def multipart_transcribe(path: Path, whisper_url: str, timeout: float) -> dict[str, Any]:
whisper_url = validate_local_whisper_url(whisper_url)
boundary = "----NpuBatchTriage" + hashlib.sha256(path.name.encode()).hexdigest()[:12]
data = path.read_bytes()
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{path.name}"\r\n'
"Content-Type: application/octet-stream\r\n\r\n"
).encode() + data + (
f"\r\n--{boundary}\r\n"
'Content-Disposition: form-data; name="model"\r\n\r\n'
"whisper-1\r\n"
f"--{boundary}--\r\n"
).encode()
before = read_busy()
req = urllib.request.Request(whisper_url, data=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}"})
t0 = time.perf_counter()
with urllib.request.urlopen(req, timeout=timeout) as resp:
raw = resp.read(1024 * 1024)
status = resp.status
parsed = json.loads(raw.decode())
after = read_busy()
text = str(parsed.get("text") or parsed.get("transcription") or "").strip()
service_delta = parsed.get("npu_busy_delta_us")
sysfs_delta = None if before is None or after is None else after - before
proof_delta = service_delta if isinstance(service_delta, int) else sysfs_delta
return {
"ok": status == 200 and bool(text),
"text": text,
"transcript_chars": len(text),
"duration_seconds": parsed.get("duration_seconds"),
"language": parsed.get("language"),
"npu_busy_delta_us": proof_delta,
"verified_npu": bool(proof_delta and proof_delta > 0),
"wall_ms": round((time.perf_counter() - t0) * 1000, 2),
}
def compact_audio_item(path: Path, root: Path, lane: str, no_npu: bool, whisper_url: str, timeout: float) -> dict[str, Any]:
transcript = ""
transcribed = False
npu_delta = 0
proof_ok = False
duration = None
language = None
error = None
if not no_npu:
try:
result = multipart_transcribe(path, whisper_url, timeout)
transcript = result["text"]
transcribed = result["ok"]
npu_delta = result.get("npu_busy_delta_us") or 0
proof_ok = bool(result.get("verified_npu"))
duration = result.get("duration_seconds")
language = result.get("language")
except (urllib.error.URLError, TimeoutError, OSError, json.JSONDecodeError) as exc:
error = f"whisper_error:{type(exc).__name__}"
summary = classify_transcript(transcript, lane)
item = {
"basename": path.name,
"relative_path_hash": sha256_text(path.relative_to(root).as_posix()),
"file_id": sha256_file(path),
"media_type": "audio",
"duration_seconds": duration,
"transcribed": transcribed,
"transcript_chars": len(transcript),
"language": language,
**summary,
"npu_busy_delta_us": npu_delta,
"raw_transcript_logged": False,
"full_path_included": False,
}
if error:
item["error"] = error
return item
def process(args: argparse.Namespace) -> dict[str, Any]:
repo_root = Path(__file__).resolve().parents[1]
manifest_path = Path(args.manifest).expanduser().resolve()
manifest = load_manifest(manifest_path)
lane_cfg, approved_root, root = resolve_lane_root(manifest, manifest_path, args.lane, args.root)
exts = allowed_exts(lane_cfg)
if not exts:
raise FailClosed(f"extensions_missing:{args.lane}")
manifest_limit = int(lane_cfg.get("max_files", 50))
limit = min(args.limit if args.limit is not None else manifest_limit, manifest_limit)
files, skipped, files_seen = iter_files(root, approved_root, exts, float(lane_cfg.get("max_file_mb", 25)), args.max_age_days)
selected = files[:limit]
npu_before = read_busy()
items: list[dict[str, Any]] = []
errors: list[str] = []
doc_module = None
if args.lane in AUDIO_LANES and not args.no_npu:
validate_local_whisper_url(args.whisper_url)
if args.lane in DOC_IMAGE_LANES and not args.no_npu:
try:
doc_module = load_doc_triage_module(repo_root)
except Exception as exc:
errors.append(f"doc_triage_import_error:{type(exc).__name__}")
for path in selected:
try:
if args.lane in AUDIO_LANES:
item = compact_audio_item(path, root, args.lane, args.no_npu, args.whisper_url, args.timeout_seconds)
elif doc_module is not None:
opts = doc_module.TriageOptions(
dry_run=False,
include_ocr_text=False,
include_full_path=False,
use_embeddings=not args.no_npu,
allowed_roots=[approved_root],
timeout_seconds=args.timeout_seconds,
)
item = compact_doc_item(path, root, args.lane, doc_module.triage_file(path, opts))
else:
item = fallback_doc_item(path, root, args.lane)
if args.include_full_path:
item["full_path"] = str(path)
item["full_path_included"] = True
if args.include_raw_text:
item["raw_text_included"] = False
item["raw_text_note"] = "unsupported_by_batch_wrapper"
items.append(item)
except FailClosed:
raise
except Exception as exc:
errors.append(f"{path.name}:{type(exc).__name__}")
items.append({"basename": path.name, "ok": False, "error": type(exc).__name__, "raw_text_redacted": True, "full_path_included": False})
npu_after = read_busy()
sysfs_delta = None if npu_before is None or npu_after is None else npu_after - npu_before
item_deltas = [i.get("npu_busy_delta_us") for i in items if isinstance(i.get("npu_busy_delta_us"), int)]
claimed = not args.no_npu and any((d or 0) > 0 for d in item_deltas + ([sysfs_delta] if isinstance(sysfs_delta, int) else []))
proof_ok = claimed and bool(sysfs_delta is None or sysfs_delta > 0 or any((d or 0) > 0 for d in item_deltas))
return {
"ok": not errors,
"lane": args.lane,
"dry_run": True,
"approved_root": True,
"root_basename": root.name,
"files_seen": files_seen,
"files_processed": len(items),
"skipped": skipped,
"npu": {"claimed": claimed, "busy_delta_us": sysfs_delta, "proof_ok": proof_ok},
"mutations": MUTATIONS_FALSE.copy(),
"items": items,
"raw_content_redacted": not args.include_raw_text,
"full_paths_included": bool(args.include_full_path),
"errors": errors,
"gates": {
"external_uploads": False,
"private_root_broadening": False,
"obsidian_mutation": False,
"vector_db_mutation": False,
"outbound_sends": False,
"routing_changes": False,
},
}
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Explicit-root dry-run batch triage wrapper")
parser.add_argument("--manifest", required=True, help="lane approval manifest; missing/unapproved fails closed")
parser.add_argument("--lane", required=True, choices=LANES)
parser.add_argument("--root", help="optional narrower root under the manifest-approved lane root")
parser.add_argument("--dry-run", action="store_true", help="required; mutation modes are not implemented")
parser.add_argument("--limit", type=int, default=None)
parser.add_argument("--max-age-days", type=float, default=None)
parser.add_argument("--include-raw-text", action="store_true", help="kept redacted by this wrapper; present only for explicit operator attempts")
parser.add_argument("--include-full-path", action="store_true", help="operator-only local debugging")
parser.add_argument("--no-npu", action="store_true", help="CPU-only smoke; never claims NPU")
parser.add_argument("--json", action="store_true", help="emit compact JSON")
parser.add_argument("--pretty", action="store_true", help="pretty JSON for local debugging")
parser.add_argument("--whisper-url", default=DEFAULT_WHISPER_URL)
parser.add_argument("--timeout-seconds", type=float, default=20.0)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if not args.dry_run:
print(json.dumps({"ok": False, "error": "dry_run_required", "mutations": MUTATIONS_FALSE}), file=sys.stderr)
return 2
if args.limit is not None and args.limit < 1:
print(json.dumps({"ok": False, "error": "limit_must_be_positive"}), file=sys.stderr)
return 2
try:
out = process(args)
except FailClosed as exc:
out = {"ok": False, "error": "fail_closed", "reason": str(exc), "dry_run": True, "mutations": MUTATIONS_FALSE.copy()}
print(json.dumps(out, indent=2 if args.pretty else None, sort_keys=True))
return 0 if out.get("ok") else 2
if __name__ == "__main__":
raise SystemExit(main())