#!/usr/bin/env python3 """Local-file voice/audio NPU advisory pipeline. Side-effect-free first slice: local audio file -> Whisper NPU -> classifier NPU -> advisory gate No platform fetching, outbound sends, Obsidian/memory/vector writes, service restarts, or live Atlas/Hermes routing changes are performed by this script. """ from __future__ import annotations import argparse import ipaddress import json import mimetypes import os import re import sys import time import uuid import wave from pathlib import Path from typing import Any import urllib.error import urllib.parse import urllib.request DEFAULT_WHISPER_URL = "http://127.0.0.1:18816/v1/audio/transcriptions" DEFAULT_CLASSIFIER_URL = "http://127.0.0.1:18819/v1/classify" NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") AUDIO_EXTENSIONS = {".wav", ".ogg", ".oga", ".opus", ".mp3", ".m4a", ".mp4", ".webm", ".flac"} ACTION_MARKERS = re.compile( r"\b(remind|todo|to-do|task|follow[- ]?up|schedule|call|email|send|draft|inspect|check|fix|review|question|ask)\b", re.IGNORECASE, ) class PipelineError(RuntimeError): def __init__(self, message: str, *, status: int = 1, details: dict[str, Any] | None = None): super().__init__(message) self.status = status self.details = details or {} def validate_loopback_endpoint(url: str, *, label: str) -> str: """Return url when it targets an explicit local HTTP(S) endpoint. The pipeline reads local audio and posts transcripts/audio bytes, so endpoint overrides must not be able to exfiltrate data to remote hosts. Keep the policy intentionally narrow: localhost, IPv4 loopback, or IPv6 ::1 only. """ parsed = urllib.parse.urlparse(url) if parsed.scheme not in {"http", "https"}: raise PipelineError( f"{label}_url_scheme_not_allowed", details={"url_host": parsed.hostname or "", "allowed_schemes": ["http", "https"]}, ) host = parsed.hostname if not host: raise PipelineError(f"{label}_url_missing_host") normalized = host.rstrip(".").lower() if normalized == "localhost": return url try: address = ipaddress.ip_address(normalized) except ValueError as exc: raise PipelineError( f"{label}_url_host_not_loopback", details={"url_host": host, "allowed_hosts": ["localhost", "127.0.0.0/8", "::1"]}, ) from exc if not address.is_loopback: raise PipelineError( f"{label}_url_host_not_loopback", details={"url_host": host, "allowed_hosts": ["localhost", "127.0.0.0/8", "::1"]}, ) return url def read_npu_busy_us(path: Path = NPU_BUSY_PATH) -> int | None: try: return int(path.read_text().strip()) except (OSError, ValueError): return None def delta_us(before: int | None, after: int | None) -> int | None: if before is None or after is None: return None return max(0, after - before) def encode_multipart(fields: dict[str, str], files: dict[str, tuple[str, bytes, str]]) -> tuple[bytes, str]: boundary = "----npu-voice-audio-" + uuid.uuid4().hex parts: list[bytes] = [] for name, value in fields.items(): parts.append(f"--{boundary}\r\n".encode()) parts.append(f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode()) parts.append(str(value).encode()) parts.append(b"\r\n") for name, (filename, data, content_type) in files.items(): parts.append(f"--{boundary}\r\n".encode()) parts.append(f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n'.encode()) parts.append(f"Content-Type: {content_type}\r\n\r\n".encode()) parts.append(data) parts.append(b"\r\n") parts.append(f"--{boundary}--\r\n".encode()) return b"".join(parts), f"multipart/form-data; boundary={boundary}" def post_json(url: str, payload: dict[str, Any], *, timeout: int) -> dict[str, Any]: url = validate_loopback_endpoint(url, label="classifier") req = urllib.request.Request( url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:300] raise PipelineError(f"classifier_http_{exc.code}", details={"body_preview": body}) from exc except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: raise PipelineError(f"classifier_request_failed: {exc}") from exc def post_whisper(url: str, audio_path: Path, audio_data: bytes, language: str, *, timeout: int) -> dict[str, Any]: url = validate_loopback_endpoint(url, label="whisper") content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream" body, multipart_type = encode_multipart( {"model": "whisper-1", "language": language, "response_format": "json"}, {"file": (audio_path.name, audio_data, content_type)}, ) req = urllib.request.Request(url, data=body, headers={"Content-Type": multipart_type}, method="POST") try: with urllib.request.urlopen(req, timeout=timeout) as resp: return json.loads(resp.read().decode()) except urllib.error.HTTPError as exc: body = exc.read().decode(errors="replace")[:300] raise PipelineError(f"whisper_http_{exc.code}", details={"body_preview": body}) from exc except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: raise PipelineError(f"whisper_request_failed: {exc}") from exc def validate_audio_path(path_text: str, *, max_bytes: int, max_audio_seconds: float | None) -> tuple[Path, int]: path = Path(path_text).expanduser() if not path.is_absolute(): raise PipelineError("audio_path_must_be_absolute") if path.is_symlink(): raise PipelineError("audio_path_must_not_be_symlink") if not path.exists(): raise PipelineError("audio_path_not_found") if not path.is_file(): raise PipelineError("audio_path_not_file") if path.suffix.lower() not in AUDIO_EXTENSIONS: raise PipelineError("unsupported_audio_extension", details={"extension": path.suffix.lower()}) size = path.stat().st_size if size <= 0: raise PipelineError("audio_file_empty") if size > max_bytes: raise PipelineError("audio_file_too_large", details={"bytes": size, "max_bytes": max_bytes}) if max_audio_seconds is not None and path.suffix.lower() == ".wav": try: with wave.open(str(path), "rb") as wav: duration = wav.getnframes() / float(wav.getframerate()) except wave.Error as exc: raise PipelineError(f"wav_decode_failed: {exc}") from exc if duration > max_audio_seconds: raise PipelineError("audio_duration_too_long", details={"duration_seconds": round(duration, 3), "max_audio_seconds": max_audio_seconds}) return path, size def extract_transcript(payload: dict[str, Any]) -> str: text = payload.get("text") or payload.get("transcript") or payload.get("transcription") if not text and isinstance(payload.get("segments"), list): text = " ".join(str(seg.get("text", "")) for seg in payload["segments"] if isinstance(seg, dict)) return str(text or "").strip() def label_value(labels: dict[str, Any], key: str, default: Any = None) -> Any: value = labels.get(key, default) if isinstance(value, dict) and "value" in value: return value.get("value") return value def compact_labels(classifier_payload: dict[str, Any]) -> dict[str, Any]: raw_labels = classifier_payload.get("labels") labels: dict[str, Any] = raw_labels if isinstance(raw_labels, dict) else {} return { "workflow_category": label_value(labels, "workflow_category"), "tool_needed": bool(label_value(labels, "tool_needed", False)), "urgency": label_value(labels, "urgency", "normal"), "safety_confirmation_required": bool(label_value(labels, "safety_confirmation_required", False)), } def classify_text( *, classifier_url: str, item_id: str, source: str, title: str, transcript: str, max_transcript_chars: int, dry_run: bool, timeout: int, ) -> tuple[dict[str, Any], int | None]: bounded_transcript = transcript[:max_transcript_chars] title_line = f"Title: {title}\n" if title else "" text = "Voice memo transcript summary candidate.\n" f"Source: {source}\n" f"{title_line}Transcript:\n{bounded_transcript}" payload = { "id": item_id, "text": text, "context": {"source": source, "media": "audio"}, "options": {"include_evidence": False, "dry_run": dry_run}, } before = read_npu_busy_us() data = post_json(classifier_url, payload, timeout=timeout) after = read_npu_busy_us() return data, delta_us(before, after) def decide_gate(transcript: str, labels: dict[str, Any], whisper_proven: bool, classifier_proven: bool) -> tuple[bool, str, str]: safety_required = bool(labels.get("safety_confirmation_required")) urgency = str(labels.get("urgency") or "normal").lower() action_worthy = bool(labels.get("tool_needed")) or urgency in {"high", "critical"} or bool(ACTION_MARKERS.search(transcript)) if not whisper_proven or not classifier_proven: return action_worthy, "blocked_missing_npu_proof", "npu_proof_required" if safety_required: return action_worthy, "blocked_safety_confirmation_required", "human_approval_required" if action_worthy: return True, "advisory_only_not_sent", "dry_run_no_side_effects" return False, "suppressed_not_action_worthy", "dry_run_no_side_effects" def run_pipeline(args: argparse.Namespace) -> dict[str, Any]: args.whisper_url = validate_loopback_endpoint(args.whisper_url, label="whisper") args.classifier_url = validate_loopback_endpoint(args.classifier_url, label="classifier") audio_path, audio_bytes = validate_audio_path( args.audio, max_bytes=args.max_bytes, max_audio_seconds=args.max_audio_seconds, ) audio_data = audio_path.read_bytes() item_id = args.id or f"voice-audio-{int(time.time())}" whisper_before = read_npu_busy_us() whisper_payload = post_whisper(args.whisper_url, audio_path, audio_data, args.language, timeout=args.timeout) whisper_after = read_npu_busy_us() whisper_sysfs_delta = delta_us(whisper_before, whisper_after) transcript = extract_transcript(whisper_payload) if not transcript: raise PipelineError("whisper_empty_transcript") whisper_response_delta = int(whisper_payload.get("npu_busy_delta_us") or 0) whisper_proven = whisper_response_delta > 0 and (whisper_sysfs_delta is None or whisper_sysfs_delta > 0) classifier_payload, classifier_sysfs_observed = classify_text( classifier_url=args.classifier_url, item_id=item_id, source=args.source, title=args.title or "", transcript=transcript, max_transcript_chars=args.max_transcript_chars, dry_run=args.dry_run, timeout=args.timeout, ) labels = compact_labels(classifier_payload) classifier_response_delta = int(classifier_payload.get("npu_busy_delta_us") or 0) classifier_response_sysfs_delta = int(classifier_payload.get("sysfs_npu_busy_delta_us") or 0) classifier_proven = classifier_response_delta > 0 and classifier_response_sysfs_delta > 0 and (classifier_sysfs_observed is None or classifier_sysfs_observed > 0) action_worthy, atlas_gate, next_gate = decide_gate(transcript, labels, whisper_proven, classifier_proven) output: dict[str, Any] = { "ok": True, "id": item_id, "source": args.source, "transcript_chars": len(transcript), "action_worthy": action_worthy, "atlas_gate": atlas_gate, "next_gate": next_gate, "whisper_npu_delta_us": whisper_response_delta, "whisper_sysfs_delta_us": whisper_sysfs_delta, "classifier_npu_delta_us": classifier_response_delta, "classifier_sysfs_delta_us": classifier_response_sysfs_delta, "classifier_observed_sysfs_delta_us": classifier_sysfs_observed, "labels": labels, "external_sends": 0, "writes": 0, } if args.include_transcript: output["transcript"] = transcript if args.include_transcript_preview_chars > 0: output["transcript_preview"] = transcript[: args.include_transcript_preview_chars] if args.include_raw: output["raw"] = {"whisper": whisper_payload, "classifier": classifier_payload} return output def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Run local-file audio through NPU Whisper and NPU classifier in dry-run advisory mode.") parser.add_argument("--audio", required=True, help="Absolute path to a local audio file; no URL/platform fetching is performed.") parser.add_argument("--id", default="", help="Optional stable item id for classifier correlation.") parser.add_argument("--source", default="local_file", choices=["local_file", "manual_smoke", "local_voice_memo", "meeting_snippet", "staged_telegram", "staged_discord"], help="Local/staged source label only.") parser.add_argument("--title", default="", help="Optional short local title for classifier context.") parser.add_argument("--language", default="en") parser.add_argument("--whisper-url", default=DEFAULT_WHISPER_URL) parser.add_argument("--classifier-url", default=DEFAULT_CLASSIFIER_URL) parser.add_argument("--dry-run", dest="dry_run", action="store_true", default=True, help="Keep classifier in dry-run advisory mode (default).") parser.add_argument("--no-dry-run", dest="dry_run", action="store_false", help="Send dry_run=false to classifier; this script still performs no side effects.") parser.add_argument("--json", action="store_true", help="Emit compact JSON; default is JSON for machine-safe handoff.") parser.add_argument("--include-transcript", action="store_true", help="Include full transcript in output; off by default.") parser.add_argument("--include-transcript-preview-chars", type=int, default=0, help="Include a bounded transcript preview; default 0.") parser.add_argument("--include-raw", action="store_true", help="Include raw service responses for one-off local debugging; off by default.") parser.add_argument("--max-bytes", type=int, default=25 * 1024 * 1024) parser.add_argument("--max-audio-seconds", type=float, default=300.0, help="Enforced for WAV inputs; other codecs remain size-capped.") parser.add_argument("--max-transcript-chars", type=int, default=6000) parser.add_argument("--timeout", type=int, default=300) return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: result = run_pipeline(args) print(json.dumps(result, ensure_ascii=False, sort_keys=True)) return 0 except PipelineError as exc: result = {"ok": False, "error": str(exc), "external_sends": 0, "writes": 0, **exc.details} print(json.dumps(result, ensure_ascii=False, sort_keys=True), file=sys.stderr) return exc.status if __name__ == "__main__": raise SystemExit(main())