#!/usr/bin/env python3 """Voice Memo Pipeline Service - native voice ingress + Kokoro TTS read-back.""" from __future__ import annotations import base64, json, os, re, time import urllib.error, urllib.request, uuid from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path PORT = int(os.environ.get("VOICE_MEMO_PORT", "18813")) WHISPER_URL = os.environ.get("WHISPER_BASE_URL", "http://127.0.0.1:18811") LLM_URL = os.environ.get("LLAMA_CPP_BASE_URL", "http://127.0.0.1:18806") KOKORO_URL = os.environ.get("KOKORO_BASE_URL", "http://127.0.0.1:18805") TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") DISCORD_BOT_TOKEN = os.environ.get("DISCORD_BOT_TOKEN", "") KOKORO_VOICE = os.environ.get("KOKORO_VOICE", "af_heart") AUDIO_DIR = Path(os.environ.get("VOICE_MEMO_AUDIO_DIR", "/tmp/voice-memo-audio")) LLM_MODEL = os.environ.get("VOICE_MEMO_LLM_MODEL", "local") AUDIO_DIR.mkdir(parents=True, exist_ok=True) def log(msg): print(f"[voice-memo] {time.strftime('%H:%M:%S')} {msg}", flush=True) def encode_multipart(fields, files): boundary = "----voice-memo-" + uuid.uuid4().hex parts = [] for n, v in fields.items(): parts.append(f"--{boundary}\r\n".encode()) parts.append(f'Content-Disposition: form-data; name="{n}"\r\n\r\n'.encode()) parts.append(str(v).encode()) parts.append(b"\r\n") for n, (fn, data, ct) in files.items(): parts.append(f"--{boundary}\r\n".encode()) parts.append(f'Content-Disposition: form-data; name="{n}"; filename="{fn}"\r\n'.encode()) parts.append(f"Content-Type: {ct}\r\n\r\n".encode()) parts.append(data) parts.append(b"\r\n") parts.append(f"--{boundary}--\r\n".encode()) return b"".join(parts), f"multipart/form-data; boundary={boundary}" def http_get_json(url, headers=None, timeout=30): req = urllib.request.Request(url, method="GET") for k, v in (headers or {}).items(): req.add_header(k, v) with urllib.request.urlopen(req, timeout=timeout) as r: return json.loads(r.read().decode()) def http_download(url, headers=None, timeout=120): req = urllib.request.Request(url, method="GET") for k, v in (headers or {}).items(): req.add_header(k, v) with urllib.request.urlopen(req, timeout=timeout) as r: return r.read() def download_telegram_voice(file_id): if not TELEGRAM_BOT_TOKEN: raise ValueError("TELEGRAM_BOT_TOKEN not configured") base = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" info = http_get_json(f"{base}/getFile?file_id={file_id}") if not info.get("ok"): raise ValueError(f"Telegram getFile failed: {info}") fp = info["result"]["file_path"] return http_download(f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{fp}") def download_discord_attachment(url): hdrs = {} if DISCORD_BOT_TOKEN: hdrs["Authorization"] = f"Bot {DISCORD_BOT_TOKEN}" return http_download(url, headers=hdrs) def transcribe_audio(audio_data, filename="audio.ogg", language="en"): fields = {"response_format": "json", "language": language or "en", "temperature": "0.0"} files = {"file": (filename, audio_data, "application/octet-stream")} body, ct = encode_multipart(fields, files) url = WHISPER_URL.rstrip("/") + "/v1/audio/transcriptions" req = urllib.request.Request(url, data=body, headers={"Content-Type": ct}, method="POST") try: with urllib.request.urlopen(req, timeout=300) as r: raw = r.read().decode() except urllib.error.HTTPError as e: raise RuntimeError(f"Whisper HTTP {e.code}: {e.read().decode()[:300]}") data = json.loads(raw) text = str(data.get("text", data.get("transcript", ""))).strip() if isinstance(data, dict) else raw.strip() if not text: raise RuntimeError("Whisper returned no transcript") return text SUMMARY_PROMPT = """You process voice memos. Given the transcript, produce a JSON object with: - "summary": 2-4 sentence summary - "action_items": list of tasks/reminders/follow-ups (empty list if none) Output ONLY valid JSON. TRANSCRIPT: {transcript}""" def summarize_transcript(transcript): payload = { "model": LLM_MODEL, "messages": [ {"role": "system", "content": "You output only valid JSON."}, {"role": "user", "content": SUMMARY_PROMPT.format(transcript=transcript)} ], "temperature": 0.3, "max_tokens": 1024, "stream": False } url = LLM_URL.rstrip("/") + "/v1/chat/completions" req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json"}, method="POST") try: with urllib.request.urlopen(req, timeout=120) as r: result = json.loads(r.read().decode()) except urllib.error.HTTPError as e: raise RuntimeError(f"LLM HTTP {e.code}: {e.read().decode()[:300]}") content = result.get("choices", [{}])[0].get("message", {}).get("content", "").strip() m = re.search(r"\{[\s\S]*\}", content) if m: try: p = json.loads(m.group()) return {"summary": p.get("summary", content), "action_items": p.get("action_items", [])} except json.JSONDecodeError: pass return {"summary": content, "action_items": []} def generate_tts(text, voice=None, fmt="mp3", speed=1.0): payload = {"model": "kokoro", "input": text, "voice": voice or KOKORO_VOICE, "response_format": fmt, "speed": speed, "stream": False} url = KOKORO_URL.rstrip("/") + "/v1/audio/speech" req = urllib.request.Request(url, data=json.dumps(payload).encode(), headers={"Content-Type": "application/json", "Accept": "audio/*"}, method="POST") try: with urllib.request.urlopen(req, timeout=120) as r: return r.read() except urllib.error.HTTPError as e: raise RuntimeError(f"Kokoro HTTP {e.code}: {e.read().decode()[:300]}") def save_audio(data, fmt="mp3"): fname = f"{uuid.uuid4().hex}.{fmt}" (AUDIO_DIR / fname).write_bytes(data) return f"/audio/{fname}" def process_memo(body, uploaded_audio=None): t0 = time.time() language = body.get("language", "en") audio_fmt = body.get("audio_format", "ogg") tts_readback = body.get("tts_readback", False) tts_voice = body.get("tts_voice", KOKORO_VOICE) tts_format = body.get("tts_format", "mp3") source = body.get("source", "unknown") if uploaded_audio: audio_data = uploaded_audio source = source or "upload" elif body.get("telegram_file_id"): log(f"Downloading Telegram voice: {body['telegram_file_id'][:20]}...") audio_data = download_telegram_voice(body["telegram_file_id"]) source = "telegram" elif body.get("discord_audio_url"): log(f"Downloading Discord attachment...") audio_data = download_discord_attachment(body["discord_audio_url"]) source = "discord" elif body.get("audio_url"): log(f"Downloading audio URL...") audio_data = http_download(body["audio_url"]) source = source or "url" elif body.get("audio_base64"): audio_data = base64.b64decode(body["audio_base64"]) source = source or "base64" else: raise ValueError("No audio source. Send: audio_url, telegram_file_id, discord_audio_url, audio_base64, or upload.") if not audio_data: raise ValueError("Audio data is empty") log(f"Got {len(audio_data)} bytes from {source}") ext = "ogg" if source == "telegram" else audio_fmt log("Transcribing...") transcript = transcribe_audio(audio_data, filename=f"voice_memo.{ext}", language=language) log(f"Transcript ({len(transcript)} chars)") log("Summarizing...") result = summarize_transcript(transcript) audio_url = None if tts_readback and result.get("summary"): log("Generating TTS read-back...") try: tts_data = generate_tts(result["summary"], voice=tts_voice, fmt=tts_format) audio_url = save_audio(tts_data, fmt=tts_format) log(f"TTS saved: {audio_url}") except Exception as exc: log(f"TTS failed (non-fatal): {exc}") elapsed = round(time.time() - t0, 2) log(f"Done in {elapsed}s") return {"ok": True, "transcript": transcript, "summary": result.get("summary", ""), "action_items": result.get("action_items", []), "audio_url": audio_url, "source": source, "duration_s": elapsed, "metadata": body.get("metadata", {})} class VoiceMemoHandler(BaseHTTPRequestHandler): def do_GET(self): path = self.path.split("?")[0].rstrip("/") if path == "/healthz": self._json({"status": "ok", "service": "voice-memo", "port": PORT}) elif path.startswith("/audio/"): self._serve_audio(path) else: self._json({"error": "not found"}, 404) def do_POST(self): path = self.path.split("?")[0].rstrip("/") if path == "/memo": self._handle_json() elif path == "/memo/upload": self._handle_upload() else: self._json({"error": "not found"}, 404) def _handle_json(self): try: n = int(self.headers.get("Content-Length", 0)) body = json.loads(self.rfile.read(n).decode()) except Exception as e: return self._json({"ok": False, "error": f"Bad body: {e}"}, 400) try: self._json(process_memo(body)) except Exception as e: log(f"Error: {e}") self._json({"ok": False, "error": str(e)}, 500) def _handle_upload(self): try: ct = self.headers.get("Content-Type", "") n = int(self.headers.get("Content-Length", 0)) raw = self.rfile.read(n) audio_data = None audio_fmt = "ogg" if "multipart/form-data" in ct: boundary = ct.split("boundary=")[-1].strip() for part in raw.split(f"--{boundary}".encode()): if not part or part.strip() in (b"--", b"--\r\n"): continue try: hend = part.index(b"\r\n\r\n") except ValueError: continue hdrs = part[:hend].decode("utf-8", errors="replace") bdata = part[hend+4:] if bdata.endswith(b"\r\n"): bdata = bdata[:-2] if 'name="file"' in hdrs or 'name="audio"' in hdrs: audio_data = bdata fm = re.search(r'filename="([^"]+)"', hdrs) if fm: e = fm.group(1).rsplit(".", 1)[-1].lower() if e in ("ogg","mp3","wav","webm","m4a","flac","opus"): audio_fmt = e else: audio_data = raw self._json(process_memo({"source": "upload", "audio_format": audio_fmt}, uploaded_audio=audio_data)) except Exception as e: log(f"Upload error: {e}") self._json({"ok": False, "error": str(e)}, 500) def _serve_audio(self, path): fname = path.split("/")[-1] fpath = AUDIO_DIR / fname if not fpath.exists(): return self._json({"error": "audio not found"}, 404) ext = fname.rsplit(".", 1)[-1].lower() mime = {"mp3":"audio/mpeg","ogg":"audio/ogg","wav":"audio/wav", "flac":"audio/flac","opus":"audio/opus"}.get(ext, "application/octet-stream") data = fpath.read_bytes() self.send_response(200) self.send_header("Content-Type", mime) self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) def _json(self, data, status=200): body = json.dumps(data, indent=2, ensure_ascii=False).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.wfile.write(body) def log_message(self, fmt, *args): pass def main(): srv = HTTPServer(("0.0.0.0", PORT), VoiceMemoHandler) log(f"Voice Memo Service on 0.0.0.0:{PORT}") log(f" Whisper: {WHISPER_URL} LLM: {LLM_URL} Kokoro: {KOKORO_URL}") try: srv.serve_forever() except KeyboardInterrupt: pass srv.server_close() if __name__ == "__main__": main()