chore(scripts): restore swarm helper utilities

This commit is contained in:
William Valentin
2026-06-04 13:26:50 -07:00
parent b88331be42
commit 1772e5a1f3
5 changed files with 1500 additions and 0 deletions
+418
View File
@@ -0,0 +1,418 @@
#!/usr/bin/env python3
"""
Voice Memo Processor Endpoint
Handles voice memo processing with support for:
- Audio URL (existing behavior)
- Telegram voice messages (file_id)
- Kokoro TTS read-back of summaries
Listens on 0.0.0.0:18813 (configurable via PORT env var).
Endpoints:
POST /process -> Process voice memo (download + transcribe + summarize + optional TTS)
POST /tts -> Generate TTS audio from text (Kokoro)
GET /audio/<fn> -> Serve generated audio file
GET /healthz -> Health check
"""
import hashlib
import http.server
import json
import os
import re
import subprocess
import sys
import tempfile
import urllib.request
import urllib.parse
import urllib.error
PORT = int(os.environ.get("PORT", 18813))
AUDIO_DIR = os.path.join(tempfile.gettempdir(), "voice-memo-audio")
os.makedirs(AUDIO_DIR, exist_ok=True)
# Service endpoints (from host perspective)
WHISPER_URL = os.environ.get("WHISPER_URL", "http://127.0.0.1:18816/v1/audio/transcriptions")
LLM_URL = os.environ.get("LLM_URL", "http://127.0.0.1:18806/v1/chat/completions")
KOKORO_URL = os.environ.get("KOKORO_URL", "http://127.0.0.1:18805/v1/audio/speech")
# Telegram Bot API
TELEGRAM_BOT_TOKEN = ""
_token_paths = [
os.path.expanduser("~/.hermes/.env"),
os.path.expanduser("~/lab/swarm/.env"),
]
for _p in _token_paths:
if os.path.isfile(_p):
with open(_p) as _f:
for _line in _f:
_line = _line.strip()
if _line.startswith("TELEGRAM_BOT_TOKEN="):
TELEGRAM_BOT_TOKEN = _line.split("=", 1)[1].strip().strip('"').strip("'")
break
if TELEGRAM_BOT_TOKEN:
break
def _json_response(handler, data, status=200):
body = json.dumps(data, indent=2).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
def _file_response(handler, filepath, content_type="audio/mpeg"):
with open(filepath, "rb") as f:
data = f.read()
handler.send_response(200)
handler.send_header("Content-Type", content_type)
handler.send_header("Content-Length", str(len(data)))
handler.end_headers()
handler.wfile.write(data)
def download_telegram_voice(file_id: str) -> str:
"""Download a Telegram voice file by file_id, return local path."""
if not TELEGRAM_BOT_TOKEN:
raise ValueError("TELEGRAM_BOT_TOKEN not configured")
# Get file path
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile?file_id={file_id}"
resp = urllib.request.urlopen(url, timeout=15)
data = json.loads(resp.read())
if not data.get("ok"):
raise ValueError(f"Telegram getFile failed: {data}")
file_path = data["result"]["file_path"]
# Download the file
download_url = f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
resp = urllib.request.urlopen(download_url, timeout=60)
audio_data = resp.read()
# Save to temp file with appropriate extension
ext = os.path.splitext(file_path)[1] or ".ogg"
tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR)
tmp.write(audio_data)
tmp.close()
return tmp.name
def download_audio_url(url: str) -> str:
"""Download audio from URL, return local path."""
ext = ".mp3"
parsed = urllib.parse.urlparse(url)
path_ext = os.path.splitext(parsed.path)[1]
if path_ext in (".ogg", ".oga", ".opus", ".wav", ".m4a", ".webm", ".flac"):
ext = path_ext
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=60)
audio_data = resp.read()
# Check content type for better extension guess
ct = resp.headers.get("Content-Type", "")
if "ogg" in ct:
ext = ".ogg"
elif "webm" in ct:
ext = ".webm"
elif "wav" in ct:
ext = ".wav"
elif "mp4" in ct or "m4a" in ct:
ext = ".m4a"
tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR)
tmp.write(audio_data)
tmp.close()
return tmp.name
def transcribe_whisper(audio_path: str) -> str:
"""Transcribe audio file using local Whisper."""
filename = os.path.basename(audio_path)
# Build multipart form data
boundary = "----VoiceMemoBoundary"
with open(audio_path, "rb") as f:
file_data = f.read()
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: application/octet-stream\r\n\r\n"
).encode() + file_data + (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="model"\r\n\r\n'
f"whisper-1\r\n"
f"--{boundary}--\r\n"
).encode()
req = urllib.request.Request(
WHISPER_URL,
data=body,
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
},
)
resp = urllib.request.urlopen(req, timeout=120)
result = json.loads(resp.read())
transcript = (
result.get("text", "")
or result.get("transcription", "")
or (", ".join(s.get("text", "") for s in result.get("segments", [])) if "segments" in result else "")
)
if not transcript:
raise ValueError(f"Whisper returned no text: {json.dumps(result)[:200]}")
return transcript.strip()
def summarize_llm(transcript: str, title: str = "Voice Memo") -> str:
"""Summarize transcript using local LLM."""
payload = {
"model": "gemma-4-26b",
"messages": [
{
"role": "system",
"content": "Convert raw voice memo transcripts into concise useful notes. "
"Return markdown only with Summary, Key Points, Action Items, Open Questions.",
},
{
"role": "user",
"content": f"Title: {title}\n\nTranscript:\n{transcript[:6000]}",
},
],
"temperature": 0.2,
"max_tokens": 900,
}
req = urllib.request.Request(
LLM_URL,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req, timeout=120)
result = json.loads(resp.read())
return (
result.get("choices", [{}])[0]
.get("message", {})
.get("content", "Summary unavailable.")
)
def generate_tts(text: str, voice: str = "af_heart") -> str:
"""Generate TTS audio using Kokoro, return path to audio file."""
payload = {
"model": "kokoro",
"input": text[:4000], # Kokoro has char limits
"voice": voice,
"response_format": "mp3",
"stream": False,
"return_download_link": True,
}
req = urllib.request.Request(
KOKORO_URL,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req, timeout=120)
# Kokoro returns audio directly or with download link
content_type = resp.headers.get("Content-Type", "")
if "audio" in content_type:
# Direct audio response
audio_data = resp.read()
filename = hashlib.sha256(text.encode()).hexdigest()[:16] + ".mp3"
filepath = os.path.join(AUDIO_DIR, filename)
with open(filepath, "wb") as f:
f.write(audio_data)
return filepath
# Check for download link in headers
download_path = resp.headers.get("X-Download-Path", "")
if download_path:
return download_path
# Try JSON response
try:
result = json.loads(resp.read())
if "download_url" in result:
return result["download_url"]
if "audio_url" in result:
return result["audio_url"]
except Exception:
pass
raise ValueError("Kokoro TTS returned unexpected response format")
class VoiceMemoHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.rstrip("/")
if path == "/healthz":
_json_response(self, {"status": "ok"})
return
# Serve audio files: /audio/<filename>
if path.startswith("/audio/"):
filename = path[len("/audio/"):]
filepath = os.path.join(AUDIO_DIR, filename)
if os.path.isfile(filepath):
_file_response(self, filepath, "audio/mpeg")
return
_json_response(self, {"error": "audio file not found"}, status=404)
return
_json_response(self, {"error": "not found"}, status=404)
def do_POST(self):
path = self.path.rstrip("/")
if path == "/healthz":
_json_response(self, {"status": "ok"})
return
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
data = json.loads(body) if body else {}
except Exception as e:
_json_response(self, {"error": f"Invalid request body: {e}"}, status=400)
return
if path == "/tts":
self._handle_tts(data)
return
if path == "/process":
self._handle_process(data)
return
_json_response(self, {"error": "not found"}, status=404)
def _handle_tts(self, data):
"""Handle TTS-only request."""
text = data.get("text", "").strip()
if not text:
_json_response(self, {"error": "Missing 'text' field"}, status=400)
return
voice = data.get("voice", "af_heart")
print(f"TTS: {len(text)} chars, voice={voice}", flush=True)
try:
audio_path = generate_tts(text, voice)
filename = os.path.basename(audio_path)
audio_url = f"/audio/{filename}"
_json_response(self, {
"audio_path": audio_path,
"audio_url": audio_url,
"filename": filename,
})
except Exception as e:
print(f"TTS error: {e}", flush=True)
_json_response(self, {"error": f"TTS failed: {e}"}, status=500)
def _handle_process(self, data):
"""Handle full voice memo processing pipeline."""
# Determine audio source
audio_url = data.get("audio_url", "").strip()
telegram_file_id = data.get("telegram_file_id", "").strip()
discord_audio_url = data.get("discord_audio_url", "").strip()
title = data.get("title", "Voice Memo")
tags = data.get("tags", ["voice", "memo"])
include_tts = data.get("include_tts", False)
voice = data.get("voice", "af_heart")
source_type = "url"
local_audio = None
try:
# Download audio from appropriate source
if telegram_file_id:
print(f"Processing Telegram voice: {telegram_file_id[:20]}...", flush=True)
local_audio = download_telegram_voice(telegram_file_id)
source_type = "telegram"
elif discord_audio_url:
print(f"Processing Discord voice: {discord_audio_url[:50]}...", flush=True)
local_audio = download_audio_url(discord_audio_url)
source_type = "discord"
elif audio_url:
print(f"Processing audio URL: {audio_url[:50]}...", flush=True)
local_audio = download_audio_url(audio_url)
source_type = "url"
else:
_json_response(self, {
"error": "Must provide one of: audio_url, telegram_file_id, discord_audio_url"
}, status=400)
return
# Transcribe
print(f"Transcribing {os.path.basename(local_audio)}...", flush=True)
transcript = transcribe_whisper(local_audio)
print(f"Transcript: {len(transcript)} chars", flush=True)
# Summarize
print("Summarizing...", flush=True)
summary = summarize_llm(transcript, title)
print(f"Summary: {len(summary)} chars", flush=True)
# Optional TTS
tts_url = None
tts_path = None
if include_tts and summary:
try:
print("Generating TTS read-back...", flush=True)
tts_path = generate_tts(summary, voice)
tts_filename = os.path.basename(tts_path)
tts_url = f"/audio/{tts_filename}"
print(f"TTS: {tts_filename}", flush=True)
except Exception as e:
print(f"TTS warning (non-fatal): {e}", flush=True)
result = {
"source_type": source_type,
"title": title,
"tags": tags,
"transcript": transcript,
"summary": summary,
"created_at": __import__("datetime").datetime.now().isoformat(),
}
if tts_url:
result["tts_audio_url"] = tts_url
result["tts_audio_path"] = tts_path
_json_response(self, result)
except Exception as e:
print(f"Error: {e}", flush=True)
_json_response(self, {"error": str(e)}, status=500)
finally:
# Clean up downloaded audio (keep TTS files for serving)
if local_audio and os.path.isfile(local_audio):
try:
os.unlink(local_audio)
except Exception:
pass
def log_message(self, format, *args):
pass
def main():
server = http.server.HTTPServer(("0.0.0.0", PORT), VoiceMemoHandler)
print(f"voice-memo-processor listening on 0.0.0.0:{PORT}", flush=True)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()
if __name__ == "__main__":
main()