From d2bad88596962e5337eac026c9012f561da95063 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Fri, 5 Jun 2026 15:52:43 -0700 Subject: [PATCH] feat(npu): add voice audio advisory pipeline --- docs/npu-voice-audio-pipeline.md | 135 ++++++++++ scripts/npu_voice_audio_pipeline.py | 339 +++++++++++++++++++++++++ tests/test_npu_voice_audio_pipeline.py | 170 +++++++++++++ 3 files changed, 644 insertions(+) create mode 100644 docs/npu-voice-audio-pipeline.md create mode 100755 scripts/npu_voice_audio_pipeline.py create mode 100644 tests/test_npu_voice_audio_pipeline.py diff --git a/docs/npu-voice-audio-pipeline.md b/docs/npu-voice-audio-pipeline.md new file mode 100644 index 0000000..d3aa709 --- /dev/null +++ b/docs/npu-voice-audio-pipeline.md @@ -0,0 +1,135 @@ +# NPU voice/audio local-file pipeline + +This is the first-slice local-file voice/audio path for the NPU maximization program: + +```text +local audio file or already-staged attachment + -> OpenVINO NPU Whisper (:18816) + -> OpenVINO NPU classifier (:18819) + -> explicit advisory gate + -> Atlas/Hermes only after separate approval +``` + +The implementation is `scripts/npu_voice_audio_pipeline.py`. It is a CLI wrapper only; it starts no listener and performs no outbound sends, Obsidian writes, memory writes, vector DB mutations, Kanban mutations, service restarts, platform API calls, or live Atlas/Hermes routing changes. + +## Safety gates + +Closed unless explicitly approved later: + +- Telegram/Discord fetching by bot token or attachment URL. +- Outbound messages or auto-sends. +- Obsidian/vault writes. +- Memory writes. +- Vector DB mutation or reindex. +- Automatic Kanban mutation. +- Service restarts or new persistent listeners. +- Private-directory root broadening. +- Live Atlas/Hermes routing authority changes. + +HTTP success is not NPU proof. For NPU claims, require real inference plus positive `/sys/class/accel/accel0/device/npu_busy_time_us` deltas. The CLI reports response deltas and observed sysfs deltas for Whisper and classifier calls. + +## Example: synthetic local WAV smoke + +```bash +cd /home/will/lab/swarm +python - <<'PY' +import math, struct, wave +path = '/tmp/npu-voice-smoke.wav' +sr = 16000 +with wave.open(path, 'wb') as w: + w.setnchannels(1) + w.setsampwidth(2) + w.setframerate(sr) + frames = bytearray() + for i in range(int(sr * 0.6)): + frames.extend(struct.pack(' Whisper NPU -> classifier NPU -> advisory gate + +No platform fetching, outbound sends, Obsidian/memory/vector writes, service +restarts, or live Atlas/Hermes routing changes are performed by this script. +""" +from __future__ import annotations + +import argparse +import ipaddress +import json +import mimetypes +import os +import re +import sys +import time +import uuid +import wave +from pathlib import Path +from typing import Any +import urllib.error +import urllib.parse +import urllib.request + +DEFAULT_WHISPER_URL = "http://127.0.0.1:18816/v1/audio/transcriptions" +DEFAULT_CLASSIFIER_URL = "http://127.0.0.1:18819/v1/classify" +NPU_BUSY_PATH = Path("/sys/class/accel/accel0/device/npu_busy_time_us") +AUDIO_EXTENSIONS = {".wav", ".ogg", ".oga", ".opus", ".mp3", ".m4a", ".mp4", ".webm", ".flac"} +ACTION_MARKERS = re.compile( + r"\b(remind|todo|to-do|task|follow[- ]?up|schedule|call|email|send|draft|inspect|check|fix|review|question|ask)\b", + re.IGNORECASE, +) + + +class PipelineError(RuntimeError): + def __init__(self, message: str, *, status: int = 1, details: dict[str, Any] | None = None): + super().__init__(message) + self.status = status + self.details = details or {} + + +def validate_loopback_endpoint(url: str, *, label: str) -> str: + """Return url when it targets an explicit local HTTP(S) endpoint. + + The pipeline reads local audio and posts transcripts/audio bytes, so endpoint + overrides must not be able to exfiltrate data to remote hosts. Keep the + policy intentionally narrow: localhost, IPv4 loopback, or IPv6 ::1 only. + """ + parsed = urllib.parse.urlparse(url) + if parsed.scheme not in {"http", "https"}: + raise PipelineError( + f"{label}_url_scheme_not_allowed", + details={"url_host": parsed.hostname or "", "allowed_schemes": ["http", "https"]}, + ) + host = parsed.hostname + if not host: + raise PipelineError(f"{label}_url_missing_host") + normalized = host.rstrip(".").lower() + if normalized == "localhost": + return url + try: + address = ipaddress.ip_address(normalized) + except ValueError as exc: + raise PipelineError( + f"{label}_url_host_not_loopback", + details={"url_host": host, "allowed_hosts": ["localhost", "127.0.0.0/8", "::1"]}, + ) from exc + if not address.is_loopback: + raise PipelineError( + f"{label}_url_host_not_loopback", + details={"url_host": host, "allowed_hosts": ["localhost", "127.0.0.0/8", "::1"]}, + ) + return url + + +def read_npu_busy_us(path: Path = NPU_BUSY_PATH) -> int | None: + try: + return int(path.read_text().strip()) + except (OSError, ValueError): + return None + + +def delta_us(before: int | None, after: int | None) -> int | None: + if before is None or after is None: + return None + return max(0, after - before) + + +def encode_multipart(fields: dict[str, str], files: dict[str, tuple[str, bytes, str]]) -> tuple[bytes, str]: + boundary = "----npu-voice-audio-" + uuid.uuid4().hex + parts: list[bytes] = [] + for name, value in fields.items(): + parts.append(f"--{boundary}\r\n".encode()) + parts.append(f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode()) + parts.append(str(value).encode()) + parts.append(b"\r\n") + for name, (filename, data, content_type) in files.items(): + parts.append(f"--{boundary}\r\n".encode()) + parts.append(f'Content-Disposition: form-data; name="{name}"; filename="{filename}"\r\n'.encode()) + parts.append(f"Content-Type: {content_type}\r\n\r\n".encode()) + parts.append(data) + parts.append(b"\r\n") + parts.append(f"--{boundary}--\r\n".encode()) + return b"".join(parts), f"multipart/form-data; boundary={boundary}" + + +def post_json(url: str, payload: dict[str, Any], *, timeout: int) -> dict[str, Any]: + url = validate_loopback_endpoint(url, label="classifier") + req = urllib.request.Request( + url, + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as exc: + body = exc.read().decode(errors="replace")[:300] + raise PipelineError(f"classifier_http_{exc.code}", details={"body_preview": body}) from exc + except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: + raise PipelineError(f"classifier_request_failed: {exc}") from exc + + +def post_whisper(url: str, audio_path: Path, audio_data: bytes, language: str, *, timeout: int) -> dict[str, Any]: + url = validate_loopback_endpoint(url, label="whisper") + content_type = mimetypes.guess_type(audio_path.name)[0] or "application/octet-stream" + body, multipart_type = encode_multipart( + {"model": "whisper-1", "language": language, "response_format": "json"}, + {"file": (audio_path.name, audio_data, content_type)}, + ) + req = urllib.request.Request(url, data=body, headers={"Content-Type": multipart_type}, method="POST") + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as exc: + body = exc.read().decode(errors="replace")[:300] + raise PipelineError(f"whisper_http_{exc.code}", details={"body_preview": body}) from exc + except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: + raise PipelineError(f"whisper_request_failed: {exc}") from exc + + +def validate_audio_path(path_text: str, *, max_bytes: int, max_audio_seconds: float | None) -> tuple[Path, int]: + path = Path(path_text).expanduser() + if not path.is_absolute(): + raise PipelineError("audio_path_must_be_absolute") + if path.is_symlink(): + raise PipelineError("audio_path_must_not_be_symlink") + if not path.exists(): + raise PipelineError("audio_path_not_found") + if not path.is_file(): + raise PipelineError("audio_path_not_file") + if path.suffix.lower() not in AUDIO_EXTENSIONS: + raise PipelineError("unsupported_audio_extension", details={"extension": path.suffix.lower()}) + size = path.stat().st_size + if size <= 0: + raise PipelineError("audio_file_empty") + if size > max_bytes: + raise PipelineError("audio_file_too_large", details={"bytes": size, "max_bytes": max_bytes}) + if max_audio_seconds is not None and path.suffix.lower() == ".wav": + try: + with wave.open(str(path), "rb") as wav: + duration = wav.getnframes() / float(wav.getframerate()) + except wave.Error as exc: + raise PipelineError(f"wav_decode_failed: {exc}") from exc + if duration > max_audio_seconds: + raise PipelineError("audio_duration_too_long", details={"duration_seconds": round(duration, 3), "max_audio_seconds": max_audio_seconds}) + return path, size + + +def extract_transcript(payload: dict[str, Any]) -> str: + text = payload.get("text") or payload.get("transcript") or payload.get("transcription") + if not text and isinstance(payload.get("segments"), list): + text = " ".join(str(seg.get("text", "")) for seg in payload["segments"] if isinstance(seg, dict)) + return str(text or "").strip() + + +def label_value(labels: dict[str, Any], key: str, default: Any = None) -> Any: + value = labels.get(key, default) + if isinstance(value, dict) and "value" in value: + return value.get("value") + return value + + +def compact_labels(classifier_payload: dict[str, Any]) -> dict[str, Any]: + raw_labels = classifier_payload.get("labels") + labels: dict[str, Any] = raw_labels if isinstance(raw_labels, dict) else {} + return { + "workflow_category": label_value(labels, "workflow_category"), + "tool_needed": bool(label_value(labels, "tool_needed", False)), + "urgency": label_value(labels, "urgency", "normal"), + "safety_confirmation_required": bool(label_value(labels, "safety_confirmation_required", False)), + } + + +def classify_text( + *, + classifier_url: str, + item_id: str, + source: str, + title: str, + transcript: str, + max_transcript_chars: int, + dry_run: bool, + timeout: int, +) -> tuple[dict[str, Any], int | None]: + bounded_transcript = transcript[:max_transcript_chars] + title_line = f"Title: {title}\n" if title else "" + text = "Voice memo transcript summary candidate.\n" f"Source: {source}\n" f"{title_line}Transcript:\n{bounded_transcript}" + payload = { + "id": item_id, + "text": text, + "context": {"source": source, "media": "audio"}, + "options": {"include_evidence": False, "dry_run": dry_run}, + } + before = read_npu_busy_us() + data = post_json(classifier_url, payload, timeout=timeout) + after = read_npu_busy_us() + return data, delta_us(before, after) + + +def decide_gate(transcript: str, labels: dict[str, Any], whisper_proven: bool, classifier_proven: bool) -> tuple[bool, str, str]: + safety_required = bool(labels.get("safety_confirmation_required")) + urgency = str(labels.get("urgency") or "normal").lower() + action_worthy = bool(labels.get("tool_needed")) or urgency in {"high", "critical"} or bool(ACTION_MARKERS.search(transcript)) + if not whisper_proven or not classifier_proven: + return action_worthy, "blocked_missing_npu_proof", "npu_proof_required" + if safety_required: + return action_worthy, "blocked_safety_confirmation_required", "human_approval_required" + if action_worthy: + return True, "advisory_only_not_sent", "dry_run_no_side_effects" + return False, "suppressed_not_action_worthy", "dry_run_no_side_effects" + + +def run_pipeline(args: argparse.Namespace) -> dict[str, Any]: + args.whisper_url = validate_loopback_endpoint(args.whisper_url, label="whisper") + args.classifier_url = validate_loopback_endpoint(args.classifier_url, label="classifier") + audio_path, audio_bytes = validate_audio_path( + args.audio, + max_bytes=args.max_bytes, + max_audio_seconds=args.max_audio_seconds, + ) + audio_data = audio_path.read_bytes() + item_id = args.id or f"voice-audio-{int(time.time())}" + + whisper_before = read_npu_busy_us() + whisper_payload = post_whisper(args.whisper_url, audio_path, audio_data, args.language, timeout=args.timeout) + whisper_after = read_npu_busy_us() + whisper_sysfs_delta = delta_us(whisper_before, whisper_after) + transcript = extract_transcript(whisper_payload) + if not transcript: + raise PipelineError("whisper_empty_transcript") + + whisper_response_delta = int(whisper_payload.get("npu_busy_delta_us") or 0) + whisper_proven = whisper_response_delta > 0 and (whisper_sysfs_delta is None or whisper_sysfs_delta > 0) + + classifier_payload, classifier_sysfs_observed = classify_text( + classifier_url=args.classifier_url, + item_id=item_id, + source=args.source, + title=args.title or "", + transcript=transcript, + max_transcript_chars=args.max_transcript_chars, + dry_run=args.dry_run, + timeout=args.timeout, + ) + labels = compact_labels(classifier_payload) + classifier_response_delta = int(classifier_payload.get("npu_busy_delta_us") or 0) + classifier_response_sysfs_delta = int(classifier_payload.get("sysfs_npu_busy_delta_us") or 0) + classifier_proven = classifier_response_delta > 0 and classifier_response_sysfs_delta > 0 and (classifier_sysfs_observed is None or classifier_sysfs_observed > 0) + + action_worthy, atlas_gate, next_gate = decide_gate(transcript, labels, whisper_proven, classifier_proven) + + output: dict[str, Any] = { + "ok": True, + "id": item_id, + "source": args.source, + "transcript_chars": len(transcript), + "action_worthy": action_worthy, + "atlas_gate": atlas_gate, + "next_gate": next_gate, + "whisper_npu_delta_us": whisper_response_delta, + "whisper_sysfs_delta_us": whisper_sysfs_delta, + "classifier_npu_delta_us": classifier_response_delta, + "classifier_sysfs_delta_us": classifier_response_sysfs_delta, + "classifier_observed_sysfs_delta_us": classifier_sysfs_observed, + "labels": labels, + "external_sends": 0, + "writes": 0, + } + if args.include_transcript: + output["transcript"] = transcript + if args.include_transcript_preview_chars > 0: + output["transcript_preview"] = transcript[: args.include_transcript_preview_chars] + if args.include_raw: + output["raw"] = {"whisper": whisper_payload, "classifier": classifier_payload} + return output + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Run local-file audio through NPU Whisper and NPU classifier in dry-run advisory mode.") + parser.add_argument("--audio", required=True, help="Absolute path to a local audio file; no URL/platform fetching is performed.") + parser.add_argument("--id", default="", help="Optional stable item id for classifier correlation.") + parser.add_argument("--source", default="local_file", choices=["local_file", "manual_smoke", "local_voice_memo", "meeting_snippet", "staged_telegram", "staged_discord"], help="Local/staged source label only.") + parser.add_argument("--title", default="", help="Optional short local title for classifier context.") + parser.add_argument("--language", default="en") + parser.add_argument("--whisper-url", default=DEFAULT_WHISPER_URL) + parser.add_argument("--classifier-url", default=DEFAULT_CLASSIFIER_URL) + parser.add_argument("--dry-run", dest="dry_run", action="store_true", default=True, help="Keep classifier in dry-run advisory mode (default).") + parser.add_argument("--no-dry-run", dest="dry_run", action="store_false", help="Send dry_run=false to classifier; this script still performs no side effects.") + parser.add_argument("--json", action="store_true", help="Emit compact JSON; default is JSON for machine-safe handoff.") + parser.add_argument("--include-transcript", action="store_true", help="Include full transcript in output; off by default.") + parser.add_argument("--include-transcript-preview-chars", type=int, default=0, help="Include a bounded transcript preview; default 0.") + parser.add_argument("--include-raw", action="store_true", help="Include raw service responses for one-off local debugging; off by default.") + parser.add_argument("--max-bytes", type=int, default=25 * 1024 * 1024) + parser.add_argument("--max-audio-seconds", type=float, default=300.0, help="Enforced for WAV inputs; other codecs remain size-capped.") + parser.add_argument("--max-transcript-chars", type=int, default=6000) + parser.add_argument("--timeout", type=int, default=300) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + try: + result = run_pipeline(args) + print(json.dumps(result, ensure_ascii=False, sort_keys=True)) + return 0 + except PipelineError as exc: + result = {"ok": False, "error": str(exc), "external_sends": 0, "writes": 0, **exc.details} + print(json.dumps(result, ensure_ascii=False, sort_keys=True), file=sys.stderr) + return exc.status + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_npu_voice_audio_pipeline.py b/tests/test_npu_voice_audio_pipeline.py new file mode 100644 index 0000000..0c8fd5f --- /dev/null +++ b/tests/test_npu_voice_audio_pipeline.py @@ -0,0 +1,170 @@ +import importlib.util +import json +import sys +import types +import unittest +from argparse import Namespace +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import cast +from unittest import mock + +MODULE_PATH = Path(__file__).resolve().parents[1] / "scripts" / "npu_voice_audio_pipeline.py" + + +def load_module(): + spec = importlib.util.spec_from_file_location("npu_voice_audio_pipeline", MODULE_PATH) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return cast(types.ModuleType, module) + + +class NpuVoiceAudioPipelineTests(unittest.TestCase): + def setUp(self): + self.pipeline = load_module() + + def test_rejects_relative_audio_path(self): + with self.assertRaisesRegex(self.pipeline.PipelineError, "audio_path_must_be_absolute"): + self.pipeline.validate_audio_path("memo.wav", max_bytes=1024, max_audio_seconds=300) + + def test_rejects_symlink_audio_path(self): + with TemporaryDirectory() as tmp: + root = Path(tmp) + target = root / "memo.wav" + target.write_bytes(b"RIFFfake") + link = root / "link.wav" + link.symlink_to(target) + with self.assertRaisesRegex(self.pipeline.PipelineError, "audio_path_must_not_be_symlink"): + self.pipeline.validate_audio_path(str(link), max_bytes=1024, max_audio_seconds=None) + + def test_compact_labels_unwraps_classifier_label_values(self): + labels = self.pipeline.compact_labels( + { + "labels": { + "workflow_category": {"value": "media"}, + "tool_needed": {"value": True}, + "urgency": {"value": "high"}, + "safety_confirmation_required": {"value": False}, + } + } + ) + self.assertEqual(labels["workflow_category"], "media") + self.assertTrue(labels["tool_needed"]) + self.assertEqual(labels["urgency"], "high") + self.assertFalse(labels["safety_confirmation_required"]) + + def test_gate_blocks_missing_npu_proof(self): + action_worthy, atlas_gate, next_gate = self.pipeline.decide_gate( + "remind me to review logs", + {"tool_needed": True, "urgency": "normal", "safety_confirmation_required": False}, + whisper_proven=False, + classifier_proven=True, + ) + self.assertTrue(action_worthy) + self.assertEqual(atlas_gate, "blocked_missing_npu_proof") + self.assertEqual(next_gate, "npu_proof_required") + + def test_loopback_endpoint_policy_accepts_local_urls(self): + allowed = [ + "http://localhost:18816/v1/audio/transcriptions", + "https://localhost:18816/v1/audio/transcriptions", + "http://127.0.0.1:18816/v1/audio/transcriptions", + "http://127.42.0.9:18816/v1/audio/transcriptions", + "http://[::1]:18816/v1/audio/transcriptions", + ] + for url in allowed: + with self.subTest(url=url): + self.assertEqual(self.pipeline.validate_loopback_endpoint(url, label="whisper"), url) + + def test_loopback_endpoint_policy_rejects_remote_urls(self): + rejected = [ + "http://example.com:18816/v1/audio/transcriptions", + "https://10.0.0.5:18816/v1/audio/transcriptions", + "http://192.168.1.10:18816/v1/audio/transcriptions", + "http://[2001:db8::1]:18816/v1/audio/transcriptions", + "file:///tmp/audio.wav", + ] + for url in rejected: + with self.subTest(url=url): + with self.assertRaisesRegex(self.pipeline.PipelineError, "whisper_url_.*not_.*|whisper_url_scheme_not_allowed"): + self.pipeline.validate_loopback_endpoint(url, label="whisper") + + def test_run_pipeline_rejects_remote_url_before_audio_read(self): + args = Namespace( + audio="/tmp/does-not-exist-remote-rejection-smoke.ogg", + id="voice-smoke", + source="local_file", + title="synthetic smoke", + language="en", + whisper_url="http://example.com:18816/v1/audio/transcriptions", + classifier_url="http://127.0.0.1:18819/v1/classify", + dry_run=True, + include_transcript=False, + include_transcript_preview_chars=0, + include_raw=False, + max_bytes=1024 * 1024, + max_audio_seconds=300, + max_transcript_chars=6000, + timeout=1, + ) + with self.assertRaisesRegex(self.pipeline.PipelineError, "whisper_url_host_not_loopback"): + self.pipeline.run_pipeline(args) + + def test_run_pipeline_compact_success_with_mocked_services(self): + with TemporaryDirectory() as tmp: + audio = Path(tmp) / "memo.ogg" + audio.write_bytes(b"not-real-audio-but-services-are-mocked") + args = Namespace( + audio=str(audio), + id="voice-smoke", + source="local_file", + title="synthetic smoke", + language="en", + whisper_url="http://127.0.0.1:18816/v1/audio/transcriptions", + classifier_url="http://127.0.0.1:18819/v1/classify", + dry_run=True, + include_transcript=False, + include_transcript_preview_chars=0, + include_raw=False, + max_bytes=1024 * 1024, + max_audio_seconds=300, + max_transcript_chars=6000, + timeout=1, + ) + busy_values = iter([100, 150, 150, 225]) + with mock.patch.object(self.pipeline, "read_npu_busy_us", side_effect=lambda: next(busy_values)): + with mock.patch.object( + self.pipeline, + "post_whisper", + return_value={"text": "remind me to check npu logs", "npu_busy_delta_us": 50}, + ): + with mock.patch.object( + self.pipeline, + "post_json", + return_value={ + "dry_run": True, + "labels": { + "workflow_category": {"value": "media"}, + "tool_needed": {"value": True}, + "urgency": {"value": "normal"}, + "safety_confirmation_required": {"value": False}, + }, + "npu_busy_delta_us": 75, + "sysfs_npu_busy_delta_us": 75, + }, + ): + result = self.pipeline.run_pipeline(args) + self.assertTrue(result["ok"]) + self.assertEqual(result["external_sends"], 0) + self.assertEqual(result["writes"], 0) + self.assertEqual(result["whisper_sysfs_delta_us"], 50) + self.assertEqual(result["classifier_observed_sysfs_delta_us"], 75) + self.assertEqual(result["atlas_gate"], "advisory_only_not_sent") + self.assertNotIn("transcript", result) + json.dumps(result) + + +if __name__ == "__main__": + unittest.main()