diff --git a/config/triage-roots.example.yaml b/config/triage-roots.example.yaml new file mode 100644 index 0000000..0ddc747 --- /dev/null +++ b/config/triage-roots.example.yaml @@ -0,0 +1,52 @@ +version: 1 +policy: + default_mode: dry_run + require_explicit_root: true + allow_external_uploads: false + allow_mutations: false + log_raw_text: false + include_full_paths_default: false + npu_proof_path: /sys/class/accel/accel0/device/npu_busy_time_us + +# Copy to config/triage-roots.local.yaml and approve exactly one narrow, +# lane-specific staging root. The committed template is intentionally +# unapproved/fail-closed; do not point any lane at broad home, Downloads, +# vault, screenshot, photo-library, or historical audio roots without explicit +# approval for that exact lane/root. +roots: + screenshots: + approved: false + root: null + allowed_extensions: [.png, .jpg, .jpeg, .webp, .heic] + max_files: 50 + max_file_mb: 25 + receipts: + approved: false + root: null + allowed_extensions: [.png, .jpg, .jpeg, .pdf, .webp] + max_files: 50 + max_file_mb: 25 + downloads: + approved: false + root: null + allowed_extensions: [.pdf, .png, .jpg, .jpeg, .webp] + max_files: 50 + max_file_mb: 25 + obsidian_attachments: + approved: false + root: null + allowed_extensions: [.pdf, .png, .jpg, .jpeg, .webp, .mp3, .m4a, .wav, .ogg] + max_files: 50 + max_file_mb: 50 + voice_memos: + approved: false + root: null + allowed_extensions: [.mp3, .m4a, .wav, .ogg, .opus] + max_files: 25 + max_file_mb: 100 + meeting_snippets: + approved: false + root: null + allowed_extensions: [.mp3, .m4a, .wav, .ogg, .opus] + max_files: 25 + max_file_mb: 200 diff --git a/config/triage-roots.test.yaml b/config/triage-roots.test.yaml new file mode 100644 index 0000000..18eb1a0 --- /dev/null +++ b/config/triage-roots.test.yaml @@ -0,0 +1,46 @@ +version: 1 +policy: + default_mode: dry_run + require_explicit_root: true + allow_external_uploads: false + allow_mutations: false + log_raw_text: false + include_full_paths_default: false + npu_proof_path: /sys/class/accel/accel0/device/npu_busy_time_us +roots: + screenshots: + approved: true + root: ../openvino-doc-image-triage-npu/samples + allowed_extensions: [.png, .jpg, .jpeg, .webp, .heic] + max_files: 50 + max_file_mb: 25 + receipts: + approved: true + root: ../openvino-doc-image-triage-npu/samples + allowed_extensions: [.png, .jpg, .jpeg, .pdf, .webp] + max_files: 50 + max_file_mb: 25 + downloads: + approved: true + root: ../openvino-doc-image-triage-npu/samples + allowed_extensions: [.pdf, .png, .jpg, .jpeg, .webp] + max_files: 50 + max_file_mb: 25 + obsidian_attachments: + approved: true + root: ../openvino-doc-image-triage-npu/samples + allowed_extensions: [.pdf, .png, .jpg, .jpeg, .webp, .mp3, .m4a, .wav, .ogg] + max_files: 50 + max_file_mb: 50 + voice_memos: + approved: true + root: ../tmp/synthetic-voice-memos + allowed_extensions: [.mp3, .m4a, .wav, .ogg, .opus] + max_files: 25 + max_file_mb: 100 + meeting_snippets: + approved: true + root: ../tmp/synthetic-meeting-snippets + allowed_extensions: [.mp3, .m4a, .wav, .ogg, .opus] + max_files: 25 + max_file_mb: 200 diff --git a/docs/npu-batch-triage-dry-run.md b/docs/npu-batch-triage-dry-run.md new file mode 100644 index 0000000..1b6717f --- /dev/null +++ b/docs/npu-batch-triage-dry-run.md @@ -0,0 +1,65 @@ +# Explicit-root NPU batch triage dry-run examples + +These examples are wrappers only. They do not install cron jobs, enable services, +change Atlas/Hermes routing, write Obsidian/RAG/vector DBs, move/delete files, or +send outbound messages. + +The committed manifest template at `config/triage-roots.example.yaml` is +intentionally unapproved. For real private data, copy it to +`config/triage-roots.local.yaml` and approve exactly one narrow lane-specific +staging folder. Request-level `--root` may narrow that manifest root but cannot +broaden it. + +Synthetic document/image smoke, CPU-only/no NPU claim: + +```bash +python scripts/npu-batch-triage-dry-run.py \ + --manifest config/triage-roots.test.yaml \ + --lane screenshots \ + --root openvino-doc-image-triage-npu/samples \ + --limit 5 \ + --dry-run \ + --no-npu \ + --json +``` + +Synthetic document/image smoke with the existing local embeddings NPU service, +if `127.0.0.1:18817` is healthy. Treat NPU as proven only when `npu.proof_ok` is +true and `npu.busy_delta_us` (or item-level delta) is positive: + +```bash +python scripts/npu-batch-triage-dry-run.py \ + --manifest config/triage-roots.test.yaml \ + --lane receipts \ + --root openvino-doc-image-triage-npu/samples \ + --limit 5 \ + --dry-run \ + --json +``` + +Audio smoke should use generated/public synthetic audio only until a private +audio staging root is approved: + +```bash +python scripts/npu-batch-triage-dry-run.py \ + --manifest config/triage-roots.test.yaml \ + --lane voice_memos \ + --root tmp/synthetic-voice-memos \ + --limit 3 \ + --dry-run \ + --no-npu \ + --json +``` + +Cron/n8n shape (disabled example only): + +```text +Manual Trigger / disabled cron + -> Execute Command: python /home/will/lab/swarm/scripts/npu-batch-triage-dry-run.py --manifest /home/will/lab/swarm/config/triage-roots.local.yaml --lane receipts --limit 25 --dry-run --json + -> IF ok && npu.proof_ok && files_processed > 0 + -> local dashboard/report only +``` + +Do not connect this output to Telegram/Discord/email sends, Obsidian writes, +RAG/vector reindex, file moves/deletes, Kanban mutation, service restarts, or +Atlas/Hermes routing without a separate reviewed approval gate. diff --git a/scripts/npu-batch-triage-dry-run.py b/scripts/npu-batch-triage-dry-run.py new file mode 100755 index 0000000..0301c78 --- /dev/null +++ b/scripts/npu-batch-triage-dry-run.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python3 +"""Explicit-root dry-run batch triage for local documents, images, and audio. + +This wrapper is intentionally report-only. It requires a lane-scoped approved +root in a manifest, rejects request roots that broaden that approval, redacts raw +text/transcripts by default, and never mutates Obsidian, RAG/vector DBs, files, +routing, memory, services, or sends. +""" +from __future__ import annotations + +import argparse +import datetime as dt +import hashlib +import ipaddress +import importlib.util +import json +import mimetypes +import os +import re +import sys +import time +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path +from typing import Any + +try: + import yaml # type: ignore +except Exception as exc: # pragma: no cover + raise SystemExit("PyYAML is required to read triage root manifests") from exc + +LANES = ( + "screenshots", + "receipts", + "downloads", + "obsidian_attachments", + "voice_memos", + "meeting_snippets", +) +AUDIO_LANES = {"voice_memos", "meeting_snippets"} +DOC_IMAGE_LANES = {"screenshots", "receipts", "downloads", "obsidian_attachments"} +SKIP_DIR_NAMES = {".git", ".obsidian", "__pycache__", ".cache", "cache", "chroma", "chromadb", "vector_db", "vectors"} +NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") +DEFAULT_WHISPER_URL = "http://127.0.0.1:18816/v1/audio/transcriptions" +MUTATIONS_FALSE = { + "obsidian": False, + "rag": False, + "vector_db": False, + "sends": False, + "file_moves": False, + "routing": False, + "memory": False, + "service_restarts": False, +} +ACTION_PATTERNS = { + "follow_up": re.compile(r"\b(follow up|follow-up|circle back|reply|respond)\b", re.I), + "date_or_deadline": re.compile(r"\b(deadline|due|by (?:mon|tue|wed|thu|fri|sat|sun)|20\d{2}[-/]\d{1,2}[-/]\d{1,2})\b", re.I), + "decision": re.compile(r"\b(decided|decision|approved|rejected|go with|choose)\b", re.I), + "task": re.compile(r"\b(todo|to-do|action item|assign|need to|please)\b", re.I), +} + + +class FailClosed(Exception): + pass + + +def sha256_text(text: str) -> str: + return "sha256:" + hashlib.sha256(text.encode("utf-8", errors="replace")).hexdigest() + + +def sha256_file(path: Path) -> str: + h = hashlib.sha256() + with path.open("rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return "sha256:" + h.hexdigest() + + +def read_busy(path: Path = NPU_BUSY_PATH) -> int | None: + try: + return int(path.read_text().strip()) + except Exception: + return None + + +def validate_local_whisper_url(whisper_url: str) -> str: + """Fail closed unless Whisper transcription stays on the approved loopback service.""" + try: + parsed = urllib.parse.urlparse(whisper_url) + port = parsed.port + except ValueError as exc: + raise FailClosed("whisper_url_invalid") from exc + + if parsed.scheme != "http": + raise FailClosed("whisper_url_scheme_not_http") + if parsed.username or parsed.password: + raise FailClosed("whisper_url_credentials_not_allowed") + if port != 18816: + raise FailClosed("whisper_url_port_not_approved") + + host = (parsed.hostname or "").strip().lower() + if host == "localhost": + return whisper_url + try: + if ipaddress.ip_address(host).is_loopback: + return whisper_url + except ValueError: + pass + raise FailClosed("whisper_url_not_loopback") + + +def is_under(path: Path, root: Path) -> bool: + try: + path.resolve().relative_to(root.resolve()) + return True + except ValueError: + return False + + +def load_manifest(path: Path) -> dict[str, Any]: + if not path.exists(): + raise FailClosed(f"manifest_missing:{path}") + data = yaml.safe_load(path.read_text()) + if not isinstance(data, dict): + raise FailClosed("manifest_invalid:not_mapping") + if data.get("version") != 1: + raise FailClosed("manifest_invalid:version_must_be_1") + policy = data.get("policy") or {} + if policy.get("default_mode", "dry_run") != "dry_run": + raise FailClosed("policy_invalid:default_mode_not_dry_run") + for key, expected in { + "require_explicit_root": True, + "allow_external_uploads": False, + "allow_mutations": False, + "log_raw_text": False, + }.items(): + if policy.get(key) is not expected: + raise FailClosed(f"policy_invalid:{key}") + if not isinstance(data.get("roots"), dict): + raise FailClosed("manifest_invalid:roots_missing") + return data + + +def resolve_lane_root(manifest: dict[str, Any], manifest_path: Path, lane: str, requested_root: str | None) -> tuple[dict[str, Any], Path, Path]: + lane_cfg = (manifest.get("roots") or {}).get(lane) + if not isinstance(lane_cfg, dict): + raise FailClosed(f"lane_missing:{lane}") + if lane_cfg.get("approved") is not True: + raise FailClosed(f"lane_unapproved:{lane}") + root_value = lane_cfg.get("root") + if not root_value: + raise FailClosed(f"root_missing:{lane}") + approved_root = Path(str(root_value)).expanduser() + if not approved_root.is_absolute(): + approved_root = (manifest_path.parent / approved_root).resolve() + else: + approved_root = approved_root.resolve() + if not approved_root.exists() or not approved_root.is_dir(): + raise FailClosed(f"approved_root_unavailable:{lane}") + + selected_root = Path(requested_root).expanduser() if requested_root else approved_root + selected_root = selected_root.resolve() + if not selected_root.exists() or not selected_root.is_dir(): + raise FailClosed(f"request_root_unavailable:{lane}") + if not is_under(selected_root, approved_root): + raise FailClosed(f"request_root_broadens_approval:{lane}") + return lane_cfg, approved_root, selected_root + + +def allowed_exts(lane_cfg: dict[str, Any]) -> set[str]: + return {str(e).lower() if str(e).startswith(".") else "." + str(e).lower() for e in lane_cfg.get("allowed_extensions", [])} + + +def iter_files(root: Path, approved_root: Path, exts: set[str], max_file_mb: float, max_age_days: float | None) -> tuple[list[Path], dict[str, int], int]: + skipped = {"extension": 0, "size": 0, "symlink_escape": 0, "not_regular_file": 0, "too_old": 0, "policy": 0} + accepted: list[Path] = [] + files_seen = 0 + now = time.time() + max_bytes = int(max_file_mb * 1024 * 1024) + for dirpath, dirnames, filenames in os.walk(root, followlinks=False): + dirnames[:] = [d for d in dirnames if d not in SKIP_DIR_NAMES and not d.startswith(".")] + current = Path(dirpath) + if not is_under(current, approved_root): + skipped["symlink_escape"] += 1 + dirnames[:] = [] + continue + for name in filenames: + path = current / name + if name.startswith("."): + skipped["policy"] += 1 + continue + files_seen += 1 + try: + resolved = path.resolve() + except Exception: + skipped["symlink_escape"] += 1 + continue + if not is_under(resolved, approved_root): + skipped["symlink_escape"] += 1 + continue + if not resolved.is_file(): + skipped["not_regular_file"] += 1 + continue + if resolved.suffix.lower() not in exts: + skipped["extension"] += 1 + continue + try: + st = resolved.stat() + except OSError: + skipped["not_regular_file"] += 1 + continue + if st.st_size > max_bytes: + skipped["size"] += 1 + continue + if max_age_days is not None and now - st.st_mtime > max_age_days * 86400: + skipped["too_old"] += 1 + continue + accepted.append(resolved) + accepted.sort(key=lambda p: p.stat().st_mtime, reverse=True) + return accepted, skipped, files_seen + + +def load_doc_triage_module(repo_root: Path): + module_path = repo_root / "openvino-doc-image-triage-npu" / "triage.py" + spec = importlib.util.spec_from_file_location("doc_image_triage", module_path) + if spec is None or spec.loader is None: + raise RuntimeError("doc_image_triage_import_failed") + module = importlib.util.module_from_spec(spec) + sys.modules.setdefault("doc_image_triage", module) + spec.loader.exec_module(module) # type: ignore[union-attr] + return module + + +def fallback_doc_item(path: Path, root: Path, lane: str) -> dict[str, Any]: + sidecar = path.with_suffix(path.suffix + ".txt") + text = "" + if sidecar.exists() and sidecar.is_file(): + text = sidecar.read_text(errors="replace")[:12000] + lower = text.lower() + category = "unknown_or_low_confidence" + if any(w in lower for w in ("receipt", "subtotal", "store")): + category = "receipt" + elif any(w in lower for w in ("invoice", "amount due", "payment due")): + category = "bill_or_invoice" + elif lane == "screenshots": + category = "screenshot_web_or_app" + reasons = [name for name, rx in ACTION_PATTERNS.items() if rx.search(text)] + return { + "basename": path.name, + "relative_path_hash": sha256_text(path.relative_to(root).as_posix()), + "file_id": sha256_file(path), + "media_type": infer_media_type(path), + "category": category, + "needs_attention": bool(reasons), + "reasons": sorted(reasons), + "raw_text_redacted": True, + "full_path_included": False, + "metadata": {"dates_count": len(set(re.findall(r"\b20\d{2}[-/]\d{1,2}[-/]\d{1,2}\b", text))), "amounts_count": len(set(re.findall(r"\$\s?\d+(?:\.\d{2})?", text))), "raw_values_redacted": True}, + "processing": {"doc_image_triage": "fallback_cpu_sidecar_rules", "npu_verified": False}, + } + + +def infer_media_type(path: Path) -> str: + if path.suffix.lower() == ".pdf": + return "pdf" + mt, _ = mimetypes.guess_type(path.name) + if mt and mt.startswith("image/"): + return "image" + if mt and mt.startswith("audio/"): + return "audio" + return "unknown" + + +def compact_doc_item(path: Path, root: Path, lane: str, triage_result: dict[str, Any]) -> dict[str, Any]: + pages = triage_result.get("pages") or [] + first = pages[0] if pages else {} + cls = first.get("classification") or {} + attn = first.get("needs_attention") or {} + meta = first.get("metadata") or {} + device_summary = triage_result.get("processing_device_summary") or {} + item = { + "basename": path.name, + "relative_path_hash": sha256_text(path.relative_to(root).as_posix()), + "file_id": triage_result.get("file_id") or sha256_file(path), + "media_type": triage_result.get("media_type") or infer_media_type(path), + "category": cls.get("label") or "unknown_or_low_confidence", + "needs_attention": bool(attn.get("value")), + "reasons": attn.get("reasons") or [], + "raw_text_redacted": True, + "full_path_included": False, + "metadata": { + "dates_count": meta.get("dates_count", 0), + "amounts_count": meta.get("amounts_count", 0), + "raw_values_redacted": True, + }, + "processing": { + "doc_image_triage": "openvino-doc-image-triage-npu", + "image_category_device": (cls.get("device") or "CPU"), + "needs_attention_device": attn.get("device") or "CPU", + "npu_verified": bool(device_summary.get("npu_verified")), + "npu_busy_delta_us": device_summary.get("npu_busy_delta_us"), + }, + } + if lane == "receipts": + item["receipt_fields"] = {"vendor_present": bool((meta.get("detected_entities") or {}).get("org_present")), "amounts_count": item["metadata"]["amounts_count"], "dates_count": item["metadata"]["dates_count"]} + return item + + +def classify_transcript(text: str, lane: str) -> dict[str, Any]: + reasons = [name for name, rx in ACTION_PATTERNS.items() if rx.search(text)] + action_count = sum(1 for rx in (ACTION_PATTERNS["follow_up"], ACTION_PATTERNS["task"]) if rx.search(text)) + decisions = 1 if ACTION_PATTERNS["decision"].search(text) else 0 + followups = 1 if ACTION_PATTERNS["follow_up"].search(text) else 0 + return { + "category": "meeting_snippet" if lane == "meeting_snippets" else "voice_memo", + "action_worthy": bool(reasons), + "reasons": sorted(reasons), + "action_items_count": action_count, + "decisions_count": decisions, + "followups_count": followups, + } + + +def multipart_transcribe(path: Path, whisper_url: str, timeout: float) -> dict[str, Any]: + whisper_url = validate_local_whisper_url(whisper_url) + boundary = "----NpuBatchTriage" + hashlib.sha256(path.name.encode()).hexdigest()[:12] + data = path.read_bytes() + body = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file"; filename="{path.name}"\r\n' + "Content-Type: application/octet-stream\r\n\r\n" + ).encode() + data + ( + f"\r\n--{boundary}\r\n" + 'Content-Disposition: form-data; name="model"\r\n\r\n' + "whisper-1\r\n" + f"--{boundary}--\r\n" + ).encode() + before = read_busy() + req = urllib.request.Request(whisper_url, data=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}) + t0 = time.perf_counter() + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = resp.read(1024 * 1024) + status = resp.status + parsed = json.loads(raw.decode()) + after = read_busy() + text = str(parsed.get("text") or parsed.get("transcription") or "").strip() + service_delta = parsed.get("npu_busy_delta_us") + sysfs_delta = None if before is None or after is None else after - before + proof_delta = service_delta if isinstance(service_delta, int) else sysfs_delta + return { + "ok": status == 200 and bool(text), + "text": text, + "transcript_chars": len(text), + "duration_seconds": parsed.get("duration_seconds"), + "language": parsed.get("language"), + "npu_busy_delta_us": proof_delta, + "verified_npu": bool(proof_delta and proof_delta > 0), + "wall_ms": round((time.perf_counter() - t0) * 1000, 2), + } + + +def compact_audio_item(path: Path, root: Path, lane: str, no_npu: bool, whisper_url: str, timeout: float) -> dict[str, Any]: + transcript = "" + transcribed = False + npu_delta = 0 + proof_ok = False + duration = None + language = None + error = None + if not no_npu: + try: + result = multipart_transcribe(path, whisper_url, timeout) + transcript = result["text"] + transcribed = result["ok"] + npu_delta = result.get("npu_busy_delta_us") or 0 + proof_ok = bool(result.get("verified_npu")) + duration = result.get("duration_seconds") + language = result.get("language") + except (urllib.error.URLError, TimeoutError, OSError, json.JSONDecodeError) as exc: + error = f"whisper_error:{type(exc).__name__}" + summary = classify_transcript(transcript, lane) + item = { + "basename": path.name, + "relative_path_hash": sha256_text(path.relative_to(root).as_posix()), + "file_id": sha256_file(path), + "media_type": "audio", + "duration_seconds": duration, + "transcribed": transcribed, + "transcript_chars": len(transcript), + "language": language, + **summary, + "npu_busy_delta_us": npu_delta, + "raw_transcript_logged": False, + "full_path_included": False, + } + if error: + item["error"] = error + return item + + +def process(args: argparse.Namespace) -> dict[str, Any]: + repo_root = Path(__file__).resolve().parents[1] + manifest_path = Path(args.manifest).expanduser().resolve() + manifest = load_manifest(manifest_path) + lane_cfg, approved_root, root = resolve_lane_root(manifest, manifest_path, args.lane, args.root) + exts = allowed_exts(lane_cfg) + if not exts: + raise FailClosed(f"extensions_missing:{args.lane}") + manifest_limit = int(lane_cfg.get("max_files", 50)) + limit = min(args.limit if args.limit is not None else manifest_limit, manifest_limit) + files, skipped, files_seen = iter_files(root, approved_root, exts, float(lane_cfg.get("max_file_mb", 25)), args.max_age_days) + selected = files[:limit] + npu_before = read_busy() + + items: list[dict[str, Any]] = [] + errors: list[str] = [] + doc_module = None + if args.lane in AUDIO_LANES and not args.no_npu: + validate_local_whisper_url(args.whisper_url) + if args.lane in DOC_IMAGE_LANES and not args.no_npu: + try: + doc_module = load_doc_triage_module(repo_root) + except Exception as exc: + errors.append(f"doc_triage_import_error:{type(exc).__name__}") + + for path in selected: + try: + if args.lane in AUDIO_LANES: + item = compact_audio_item(path, root, args.lane, args.no_npu, args.whisper_url, args.timeout_seconds) + elif doc_module is not None: + opts = doc_module.TriageOptions( + dry_run=False, + include_ocr_text=False, + include_full_path=False, + use_embeddings=not args.no_npu, + allowed_roots=[approved_root], + timeout_seconds=args.timeout_seconds, + ) + item = compact_doc_item(path, root, args.lane, doc_module.triage_file(path, opts)) + else: + item = fallback_doc_item(path, root, args.lane) + if args.include_full_path: + item["full_path"] = str(path) + item["full_path_included"] = True + if args.include_raw_text: + item["raw_text_included"] = False + item["raw_text_note"] = "unsupported_by_batch_wrapper" + items.append(item) + except FailClosed: + raise + except Exception as exc: + errors.append(f"{path.name}:{type(exc).__name__}") + items.append({"basename": path.name, "ok": False, "error": type(exc).__name__, "raw_text_redacted": True, "full_path_included": False}) + + npu_after = read_busy() + sysfs_delta = None if npu_before is None or npu_after is None else npu_after - npu_before + item_deltas = [i.get("npu_busy_delta_us") for i in items if isinstance(i.get("npu_busy_delta_us"), int)] + claimed = not args.no_npu and any((d or 0) > 0 for d in item_deltas + ([sysfs_delta] if isinstance(sysfs_delta, int) else [])) + proof_ok = claimed and bool(sysfs_delta is None or sysfs_delta > 0 or any((d or 0) > 0 for d in item_deltas)) + return { + "ok": not errors, + "lane": args.lane, + "dry_run": True, + "approved_root": True, + "root_basename": root.name, + "files_seen": files_seen, + "files_processed": len(items), + "skipped": skipped, + "npu": {"claimed": claimed, "busy_delta_us": sysfs_delta, "proof_ok": proof_ok}, + "mutations": MUTATIONS_FALSE.copy(), + "items": items, + "raw_content_redacted": not args.include_raw_text, + "full_paths_included": bool(args.include_full_path), + "errors": errors, + "gates": { + "external_uploads": False, + "private_root_broadening": False, + "obsidian_mutation": False, + "vector_db_mutation": False, + "outbound_sends": False, + "routing_changes": False, + }, + } + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Explicit-root dry-run batch triage wrapper") + parser.add_argument("--manifest", required=True, help="lane approval manifest; missing/unapproved fails closed") + parser.add_argument("--lane", required=True, choices=LANES) + parser.add_argument("--root", help="optional narrower root under the manifest-approved lane root") + parser.add_argument("--dry-run", action="store_true", help="required; mutation modes are not implemented") + parser.add_argument("--limit", type=int, default=None) + parser.add_argument("--max-age-days", type=float, default=None) + parser.add_argument("--include-raw-text", action="store_true", help="kept redacted by this wrapper; present only for explicit operator attempts") + parser.add_argument("--include-full-path", action="store_true", help="operator-only local debugging") + parser.add_argument("--no-npu", action="store_true", help="CPU-only smoke; never claims NPU") + parser.add_argument("--json", action="store_true", help="emit compact JSON") + parser.add_argument("--pretty", action="store_true", help="pretty JSON for local debugging") + parser.add_argument("--whisper-url", default=DEFAULT_WHISPER_URL) + parser.add_argument("--timeout-seconds", type=float, default=20.0) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + if not args.dry_run: + print(json.dumps({"ok": False, "error": "dry_run_required", "mutations": MUTATIONS_FALSE}), file=sys.stderr) + return 2 + if args.limit is not None and args.limit < 1: + print(json.dumps({"ok": False, "error": "limit_must_be_positive"}), file=sys.stderr) + return 2 + try: + out = process(args) + except FailClosed as exc: + out = {"ok": False, "error": "fail_closed", "reason": str(exc), "dry_run": True, "mutations": MUTATIONS_FALSE.copy()} + print(json.dumps(out, indent=2 if args.pretty else None, sort_keys=True)) + return 0 if out.get("ok") else 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_npu_batch_triage_dry_run.py b/tests/test_npu_batch_triage_dry_run.py new file mode 100644 index 0000000..80810a3 --- /dev/null +++ b/tests/test_npu_batch_triage_dry_run.py @@ -0,0 +1,202 @@ +from __future__ import annotations + +import importlib.util +import json +import os +import subprocess +import sys +from pathlib import Path + +import pytest +import yaml + +ROOT = Path(__file__).resolve().parents[1] +SCRIPT = ROOT / "scripts" / "npu-batch-triage-dry-run.py" + + +def load_script_module(): + spec = importlib.util.spec_from_file_location("npu_batch_triage_dry_run", SCRIPT) + assert spec and spec.loader + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def run_cli(*args: str) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [sys.executable, str(SCRIPT), *args], + cwd=ROOT, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=False, + ) + + +def manifest(tmp_path: Path, lane: str = "screenshots", approved: bool = True, root: Path | None = None, exts: list[str] | None = None) -> Path: + root = root or tmp_path / "approved" + root.mkdir(parents=True, exist_ok=True) + data = { + "version": 1, + "policy": { + "default_mode": "dry_run", + "require_explicit_root": True, + "allow_external_uploads": False, + "allow_mutations": False, + "log_raw_text": False, + }, + "roots": { + lane: { + "approved": approved, + "root": str(root), + "allowed_extensions": exts or [".png", ".txt", ".wav"], + "max_files": 10, + "max_file_mb": 5, + } + }, + } + path = tmp_path / "triage-roots.yaml" + path.write_text(yaml.safe_dump(data)) + return path + + +def test_missing_manifest_fails_closed(tmp_path: Path) -> None: + result = run_cli("--manifest", str(tmp_path / "missing.yaml"), "--lane", "screenshots", "--dry-run", "--no-npu", "--json") + out = json.loads(result.stdout) + assert result.returncode == 2 + assert out["ok"] is False + assert out["error"] == "fail_closed" + assert "manifest_missing" in out["reason"] + assert all(v is False for v in out["mutations"].values()) + + +def test_unapproved_lane_fails_closed(tmp_path: Path) -> None: + man = manifest(tmp_path, approved=False) + result = run_cli("--manifest", str(man), "--lane", "screenshots", "--dry-run", "--no-npu", "--json") + out = json.loads(result.stdout) + assert result.returncode == 2 + assert out["ok"] is False + assert "lane_unapproved:screenshots" in out["reason"] + + +def test_request_root_cannot_broaden_manifest_root(tmp_path: Path) -> None: + approved = tmp_path / "approved" / "narrow" + man = manifest(tmp_path, root=approved) + broad = tmp_path / "approved" + result = run_cli("--manifest", str(man), "--lane", "screenshots", "--root", str(broad), "--dry-run", "--no-npu", "--json") + out = json.loads(result.stdout) + assert result.returncode == 2 + assert out["ok"] is False + assert "request_root_broadens_approval:screenshots" in out["reason"] + + +def test_symlink_escape_is_skipped_and_output_redacted(tmp_path: Path) -> None: + approved = tmp_path / "approved" + approved.mkdir() + outside = tmp_path / "outside" + outside.mkdir() + (approved / "note.png").write_bytes(b"fake image") + (approved / "note.png.txt").write_text("Invoice payment due 2026-06-10 $42.00") + (outside / "secret.png").write_bytes(b"secret") + os.symlink(outside / "secret.png", approved / "escape.png") + man = manifest(tmp_path, root=approved, exts=[".png"]) + + result = run_cli("--manifest", str(man), "--lane", "screenshots", "--dry-run", "--no-npu", "--json") + out = json.loads(result.stdout) + assert result.returncode == 0 + assert out["ok"] is True + assert out["files_processed"] == 1 + assert out["skipped"]["symlink_escape"] == 1 + item = out["items"][0] + assert item["basename"] == "note.png" + assert item["raw_text_redacted"] is True + assert item["full_path_included"] is False + assert "full_path" not in item + assert "Invoice" not in json.dumps(out) + assert out["npu"]["claimed"] is False + assert all(v is False for v in out["mutations"].values()) + + +def test_committed_sample_manifest_cpu_smoke() -> None: + result = run_cli( + "--manifest", + "config/triage-roots.test.yaml", + "--lane", + "receipts", + "--root", + "openvino-doc-image-triage-npu/samples", + "--limit", + "2", + "--dry-run", + "--no-npu", + "--json", + ) + out = json.loads(result.stdout) + assert result.returncode == 0 + assert out["ok"] is True + assert out["lane"] == "receipts" + assert out["dry_run"] is True + assert out["files_processed"] == 2 + assert out["npu"] == {"busy_delta_us": None, "claimed": False, "proof_ok": False} or out["npu"]["claimed"] is False + assert all(v is False for v in out["mutations"].values()) + assert all(item.get("raw_text_redacted", True) for item in out["items"]) + + +def test_audio_lane_no_npu_does_not_transcribe_or_claim(tmp_path: Path) -> None: + approved = tmp_path / "voice" + approved.mkdir() + (approved / "memo.wav").write_bytes(b"not really wav; no-npu mode must not decode") + man = manifest(tmp_path, lane="voice_memos", root=approved, exts=[".wav"]) + result = run_cli("--manifest", str(man), "--lane", "voice_memos", "--dry-run", "--no-npu", "--json") + out = json.loads(result.stdout) + assert result.returncode == 0 + item = out["items"][0] + assert item["transcribed"] is False + assert item["raw_transcript_logged"] is False + assert out["npu"]["claimed"] is False + assert all(v is False for v in out["mutations"].values()) + + +def test_external_whisper_url_fails_closed_before_audio_read(tmp_path: Path) -> None: + module = load_script_module() + with pytest.raises(module.FailClosed, match="whisper_url_not_loopback"): + module.multipart_transcribe(tmp_path / "missing.wav", "http://example.com:18816/v1/audio/transcriptions", 0.01) + + +def test_audio_lane_rejects_external_whisper_url(tmp_path: Path) -> None: + approved = tmp_path / "voice" + approved.mkdir() + (approved / "memo.wav").write_bytes(b"synthetic audio bytes") + man = manifest(tmp_path, lane="voice_memos", root=approved, exts=[".wav"]) + result = run_cli( + "--manifest", + str(man), + "--lane", + "voice_memos", + "--dry-run", + "--whisper-url", + "https://example.com/v1/audio/transcriptions", + "--json", + ) + out = json.loads(result.stdout) + assert result.returncode == 2 + assert out["ok"] is False + assert out["error"] == "fail_closed" + assert out["reason"] == "whisper_url_scheme_not_http" + assert out["mutations"] == { + "obsidian": False, + "rag": False, + "vector_db": False, + "sends": False, + "file_moves": False, + "routing": False, + "memory": False, + "service_restarts": False, + } + + +def test_localhost_whisper_url_is_allowed() -> None: + module = load_script_module() + assert module.validate_local_whisper_url("http://localhost:18816/v1/audio/transcriptions") + assert module.validate_local_whisper_url("http://127.0.0.1:18816/v1/audio/transcriptions") + assert module.validate_local_whisper_url("http://[::1]:18816/v1/audio/transcriptions") diff --git a/tmp/synthetic-meeting-snippets/.gitkeep b/tmp/synthetic-meeting-snippets/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tmp/synthetic-voice-memos/.gitkeep b/tmp/synthetic-voice-memos/.gitkeep new file mode 100644 index 0000000..e69de29