#!/usr/bin/env python3 """ Voice Memo Processor Endpoint Handles voice memo processing with support for: - Audio URL (existing behavior) - Telegram voice messages (file_id) - Kokoro TTS read-back of summaries Listens on 0.0.0.0:18813 (configurable via PORT env var). Endpoints: POST /process -> Process voice memo (download + transcribe + summarize + optional TTS) POST /tts -> Generate TTS audio from text (Kokoro) GET /audio/ -> Serve generated audio file GET /healthz -> Health check """ import hashlib import http.server import json import os import re import subprocess import sys import tempfile import urllib.request import urllib.parse import urllib.error PORT = int(os.environ.get("PORT", 18813)) AUDIO_DIR = os.path.join(tempfile.gettempdir(), "voice-memo-audio") os.makedirs(AUDIO_DIR, exist_ok=True) # Service endpoints (from host perspective) WHISPER_URL = os.environ.get("WHISPER_URL", "http://127.0.0.1:18816/v1/audio/transcriptions") LLM_URL = os.environ.get("LLM_URL", "http://127.0.0.1:18806/v1/chat/completions") KOKORO_URL = os.environ.get("KOKORO_URL", "http://127.0.0.1:18805/v1/audio/speech") # Telegram Bot API TELEGRAM_BOT_TOKEN = "" _token_paths = [ os.path.expanduser("~/.hermes/.env"), os.path.expanduser("~/lab/swarm/.env"), ] for _p in _token_paths: if os.path.isfile(_p): with open(_p) as _f: for _line in _f: _line = _line.strip() if _line.startswith("TELEGRAM_BOT_TOKEN="): TELEGRAM_BOT_TOKEN = _line.split("=", 1)[1].strip().strip('"').strip("'") break if TELEGRAM_BOT_TOKEN: break def _json_response(handler, data, status=200): body = json.dumps(data, indent=2).encode() handler.send_response(status) handler.send_header("Content-Type", "application/json") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) def _file_response(handler, filepath, content_type="audio/mpeg"): with open(filepath, "rb") as f: data = f.read() handler.send_response(200) handler.send_header("Content-Type", content_type) handler.send_header("Content-Length", str(len(data))) handler.end_headers() handler.wfile.write(data) def download_telegram_voice(file_id: str) -> str: """Download a Telegram voice file by file_id, return local path.""" if not TELEGRAM_BOT_TOKEN: raise ValueError("TELEGRAM_BOT_TOKEN not configured") # Get file path url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile?file_id={file_id}" resp = urllib.request.urlopen(url, timeout=15) data = json.loads(resp.read()) if not data.get("ok"): raise ValueError(f"Telegram getFile failed: {data}") file_path = data["result"]["file_path"] # Download the file download_url = f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" resp = urllib.request.urlopen(download_url, timeout=60) audio_data = resp.read() # Save to temp file with appropriate extension ext = os.path.splitext(file_path)[1] or ".ogg" tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR) tmp.write(audio_data) tmp.close() return tmp.name def download_audio_url(url: str) -> str: """Download audio from URL, return local path.""" ext = ".mp3" parsed = urllib.parse.urlparse(url) path_ext = os.path.splitext(parsed.path)[1] if path_ext in (".ogg", ".oga", ".opus", ".wav", ".m4a", ".webm", ".flac"): ext = path_ext req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = urllib.request.urlopen(req, timeout=60) audio_data = resp.read() # Check content type for better extension guess ct = resp.headers.get("Content-Type", "") if "ogg" in ct: ext = ".ogg" elif "webm" in ct: ext = ".webm" elif "wav" in ct: ext = ".wav" elif "mp4" in ct or "m4a" in ct: ext = ".m4a" tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR) tmp.write(audio_data) tmp.close() return tmp.name def transcribe_whisper(audio_path: str) -> str: """Transcribe audio file using local Whisper.""" filename = os.path.basename(audio_path) # Build multipart form data boundary = "----VoiceMemoBoundary" with open(audio_path, "rb") as f: file_data = f.read() body = ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' f"Content-Type: application/octet-stream\r\n\r\n" ).encode() + file_data + ( f"\r\n--{boundary}\r\n" f'Content-Disposition: form-data; name="model"\r\n\r\n' f"whisper-1\r\n" f"--{boundary}--\r\n" ).encode() req = urllib.request.Request( WHISPER_URL, data=body, headers={ "Content-Type": f"multipart/form-data; boundary={boundary}", }, ) resp = urllib.request.urlopen(req, timeout=120) result = json.loads(resp.read()) transcript = ( result.get("text", "") or result.get("transcription", "") or (", ".join(s.get("text", "") for s in result.get("segments", [])) if "segments" in result else "") ) if not transcript: raise ValueError(f"Whisper returned no text: {json.dumps(result)[:200]}") return transcript.strip() def summarize_llm(transcript: str, title: str = "Voice Memo") -> str: """Summarize transcript using local LLM.""" payload = { "model": "gemma-4-26b", "messages": [ { "role": "system", "content": "Convert raw voice memo transcripts into concise useful notes. " "Return markdown only with Summary, Key Points, Action Items, Open Questions.", }, { "role": "user", "content": f"Title: {title}\n\nTranscript:\n{transcript[:6000]}", }, ], "temperature": 0.2, "max_tokens": 900, } req = urllib.request.Request( LLM_URL, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, ) resp = urllib.request.urlopen(req, timeout=120) result = json.loads(resp.read()) return ( result.get("choices", [{}])[0] .get("message", {}) .get("content", "Summary unavailable.") ) def generate_tts(text: str, voice: str = "af_heart") -> str: """Generate TTS audio using Kokoro, return path to audio file.""" payload = { "model": "kokoro", "input": text[:4000], # Kokoro has char limits "voice": voice, "response_format": "mp3", "stream": False, "return_download_link": True, } req = urllib.request.Request( KOKORO_URL, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, ) resp = urllib.request.urlopen(req, timeout=120) # Kokoro returns audio directly or with download link content_type = resp.headers.get("Content-Type", "") if "audio" in content_type: # Direct audio response audio_data = resp.read() filename = hashlib.sha256(text.encode()).hexdigest()[:16] + ".mp3" filepath = os.path.join(AUDIO_DIR, filename) with open(filepath, "wb") as f: f.write(audio_data) return filepath # Check for download link in headers download_path = resp.headers.get("X-Download-Path", "") if download_path: return download_path # Try JSON response try: result = json.loads(resp.read()) if "download_url" in result: return result["download_url"] if "audio_url" in result: return result["audio_url"] except Exception: pass raise ValueError("Kokoro TTS returned unexpected response format") class VoiceMemoHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): path = self.path.rstrip("/") if path == "/healthz": _json_response(self, {"status": "ok"}) return # Serve audio files: /audio/ if path.startswith("/audio/"): filename = path[len("/audio/"):] filepath = os.path.join(AUDIO_DIR, filename) if os.path.isfile(filepath): _file_response(self, filepath, "audio/mpeg") return _json_response(self, {"error": "audio file not found"}, status=404) return _json_response(self, {"error": "not found"}, status=404) def do_POST(self): path = self.path.rstrip("/") if path == "/healthz": _json_response(self, {"status": "ok"}) return try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length) data = json.loads(body) if body else {} except Exception as e: _json_response(self, {"error": f"Invalid request body: {e}"}, status=400) return if path == "/tts": self._handle_tts(data) return if path == "/process": self._handle_process(data) return _json_response(self, {"error": "not found"}, status=404) def _handle_tts(self, data): """Handle TTS-only request.""" text = data.get("text", "").strip() if not text: _json_response(self, {"error": "Missing 'text' field"}, status=400) return voice = data.get("voice", "af_heart") print(f"TTS: {len(text)} chars, voice={voice}", flush=True) try: audio_path = generate_tts(text, voice) filename = os.path.basename(audio_path) audio_url = f"/audio/{filename}" _json_response(self, { "audio_path": audio_path, "audio_url": audio_url, "filename": filename, }) except Exception as e: print(f"TTS error: {e}", flush=True) _json_response(self, {"error": f"TTS failed: {e}"}, status=500) def _handle_process(self, data): """Handle full voice memo processing pipeline.""" # Determine audio source audio_url = data.get("audio_url", "").strip() telegram_file_id = data.get("telegram_file_id", "").strip() discord_audio_url = data.get("discord_audio_url", "").strip() title = data.get("title", "Voice Memo") tags = data.get("tags", ["voice", "memo"]) include_tts = data.get("include_tts", False) voice = data.get("voice", "af_heart") source_type = "url" local_audio = None try: # Download audio from appropriate source if telegram_file_id: print(f"Processing Telegram voice: {telegram_file_id[:20]}...", flush=True) local_audio = download_telegram_voice(telegram_file_id) source_type = "telegram" elif discord_audio_url: print(f"Processing Discord voice: {discord_audio_url[:50]}...", flush=True) local_audio = download_audio_url(discord_audio_url) source_type = "discord" elif audio_url: print(f"Processing audio URL: {audio_url[:50]}...", flush=True) local_audio = download_audio_url(audio_url) source_type = "url" else: _json_response(self, { "error": "Must provide one of: audio_url, telegram_file_id, discord_audio_url" }, status=400) return # Transcribe print(f"Transcribing {os.path.basename(local_audio)}...", flush=True) transcript = transcribe_whisper(local_audio) print(f"Transcript: {len(transcript)} chars", flush=True) # Summarize print("Summarizing...", flush=True) summary = summarize_llm(transcript, title) print(f"Summary: {len(summary)} chars", flush=True) # Optional TTS tts_url = None tts_path = None if include_tts and summary: try: print("Generating TTS read-back...", flush=True) tts_path = generate_tts(summary, voice) tts_filename = os.path.basename(tts_path) tts_url = f"/audio/{tts_filename}" print(f"TTS: {tts_filename}", flush=True) except Exception as e: print(f"TTS warning (non-fatal): {e}", flush=True) result = { "source_type": source_type, "title": title, "tags": tags, "transcript": transcript, "summary": summary, "created_at": __import__("datetime").datetime.now().isoformat(), } if tts_url: result["tts_audio_url"] = tts_url result["tts_audio_path"] = tts_path _json_response(self, result) except Exception as e: print(f"Error: {e}", flush=True) _json_response(self, {"error": str(e)}, status=500) finally: # Clean up downloaded audio (keep TTS files for serving) if local_audio and os.path.isfile(local_audio): try: os.unlink(local_audio) except Exception: pass def log_message(self, format, *args): pass def main(): server = http.server.HTTPServer(("0.0.0.0", PORT), VoiceMemoHandler) print(f"voice-memo-processor listening on 0.0.0.0:{PORT}", flush=True) try: server.serve_forever() except KeyboardInterrupt: pass server.server_close() if __name__ == "__main__": main()