diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d8a123b --- /dev/null +++ b/Makefile @@ -0,0 +1,347 @@ +SHELL := /usr/bin/env bash +.SHELLFLAGS := -eu -o pipefail -c + +COMPOSE ?= docker compose +COMPOSE_FILE ?= docker-compose.yaml +COMMON_COMPOSE_FILE ?= swarm-common/docker-compose.yaml +ANSIBLE_DIR ?= ansible +INVENTORY ?= inventory.yml +HOST ?= zap +SERVICE ?= +PROFILE ?= +LOGS_TAIL ?= 200 +CONFIRM ?= no +OPENCLAW_REGISTRY ?= $(HOME)/.claude/state/openclaw-instances.json +OPENCLAW_PORT ?= 18789 +QEMU_URI ?= qemu:///system +LLAMA_CPP_URL ?= http://127.0.0.1:18806 +OLLAMA_URL ?= http://127.0.0.1:18807 +OPENVINO_EMBED_URL ?= http://127.0.0.1:18817 +OPENVINO_EMBED_MODEL ?= bge-base-en-v1.5-int8-ov + +DC := $(COMPOSE) -f $(COMPOSE_FILE) +COMMON_DC := $(COMPOSE) -f $(COMMON_COMPOSE_FILE) +ANSIBLE_PLAYBOOK := cd $(ANSIBLE_DIR) && ansible-playbook -i $(INVENTORY) +OPENCLAW_HOST = $(shell jq -r '.instances[] | select(.name == "$(HOST)") | .host // empty' $(OPENCLAW_REGISTRY) 2>/dev/null) +OPENCLAW_USER = $(shell jq -r '.instances[] | select(.name == "$(HOST)") | .user // "openclaw"' $(OPENCLAW_REGISTRY) 2>/dev/null) +OPENCLAW_DOMAIN = $(shell jq -r '.instances[] | select(.name == "$(HOST)") | .domain // empty' $(OPENCLAW_REGISTRY) 2>/dev/null) +REQUIRE_CONFIRM = test "$(CONFIRM)" = "yes" || { echo "This target changes VM/gateway state. Re-run with CONFIRM=yes"; exit 2; } +REQUIRE_INSTANCE = test -n "$(OPENCLAW_HOST)" -a -n "$(OPENCLAW_DOMAIN)" || { echo "Unknown OpenClaw HOST=$(HOST) in $(OPENCLAW_REGISTRY)"; exit 2; } + +.DEFAULT_GOAL := help +.PHONY: help config ps status local-ai-health openvino-embed-health up down restart pull build logs shell clean \ + api-up api-down api-restart api-init api-init-force api-health api-dedup api-logs \ + voice-up voice-gpu voice-cpu voice-down voice-build voice-logs \ + search-up search-down automation-up automation-down n8n-logs \ + common-config common-ps common-up common-down common-logs \ + openclaw-instances openclaw-info openclaw-status openclaw-health openclaw-logs \ + openclaw-version openclaw-config openclaw-ssh openclaw-root-ssh \ + gateway-status gateway-health gateway-logs gateway-restart \ + vm-list vm-autostart-list vm-info vm-mem vm-disks vm-ifaces vm-dhcp vm-start vm-shutdown vm-reboot \ + vm-autostart-enable vm-autostart-disable vm-autostart-zap-only vm-snapshot-list vm-snapshot \ + provision install customize deploy restore backup timers \ + kube-status + +help: ## Show available targets. + @awk 'BEGIN {FS = ":.*## "; printf "Usage: make [VAR=value]\n\nTargets:\n"} /^[a-zA-Z0-9_.-]+:.*## / {printf " %-18s %s\n", $$1, $$2}' $(MAKEFILE_LIST) + @printf "\nCommon vars: HOST=%s SERVICE= PROFILE= LOGS_TAIL=%s CONFIRM=%s\n" "$(HOST)" "$(LOGS_TAIL)" "$(CONFIRM)" + +config: ## Validate and render all root Docker Compose profiles. + BRAVE_API_KEY="$${BRAVE_API_KEY:-dummy}" $(DC) --profile "*" config + +ps: ## Show root Docker Compose service status. + $(DC) ps + +status: ps local-ai-health ## Show Docker service status plus host-side local AI endpoints. + +local-ai-health: ## Check host-side llama.cpp LLM, Ollama fallback, and OpenVINO NPU embeddings endpoints. + @printf "\nHost-side local AI endpoints:\n" + @printf "llama.cpp (%s): " "$(LLAMA_CPP_URL)"; \ + if curl -fsS --max-time 3 "$(LLAMA_CPP_URL)/v1/models" >/tmp/swarm-llama-models.json 2>/dev/null; then \ + printf "OK "; jq -r '[.data[].id] | join(", ")' /tmp/swarm-llama-models.json 2>/dev/null || true; \ + else \ + printf "FAILED\n"; \ + fi + @printf "ollama.service: "; systemctl --user is-active ollama.service 2>/dev/null || true + @printf "Ollama fallback API (%s): " "$(OLLAMA_URL)"; \ + curl -fsS --max-time 3 "$(OLLAMA_URL)/api/version" 2>/dev/null | jq -r '"OK version=" + .version' || printf "FAILED\n" + @printf "openvino-embeddings.service: "; systemctl --user is-active openvino-embeddings.service 2>/dev/null || true + @printf "OpenVINO NPU embeddings (%s): " "$(OPENVINO_EMBED_URL)"; \ + curl -fsS --max-time 3 "$(OPENVINO_EMBED_URL)/healthz" 2>/dev/null | jq -r '"OK model=" + .model + " device=" + .device' || printf "FAILED\n" + +openvino-embed-health: ## Smoke-test OpenVINO NPU embeddings using OPENVINO_EMBED_MODEL=bge-base-en-v1.5-int8-ov. + @curl -fsS --max-time 20 "$(OPENVINO_EMBED_URL)/v1/embeddings" \ + -H 'Content-Type: application/json' \ + -d '{"model":"$(OPENVINO_EMBED_MODEL)","input":"socket check"}' \ + | jq -r '"embeddings=" + ((.data // []) | length | tostring) + " dim=" + (((.data // [{embedding: []}])[0].embedding // []) | length | tostring) + " npu_busy_delta_us=" + ((.npu_busy_delta_us // 0) | tostring)' + +up: ## Start root compose services. Use PROFILE=api,voice,search,automation or SERVICE=name. + @if [ -n "$(PROFILE)" ]; then \ + $(DC) --profile "$(PROFILE)" up -d $(SERVICE); \ + else \ + $(DC) up -d $(SERVICE); \ + fi + +down: ## Stop root compose services. Use PROFILE=api,voice,search,automation to include profiled services. + @if [ -n "$(PROFILE)" ]; then \ + $(DC) --profile "$(PROFILE)" down; \ + else \ + $(DC) down; \ + fi + +restart: ## Restart a compose service. Use SERVICE=name. + @test -n "$(SERVICE)" || { echo "SERVICE is required, e.g. make restart SERVICE=litellm"; exit 2; } + $(DC) restart $(SERVICE) + +pull: ## Pull compose images. Use PROFILE=api,voice,search,automation or SERVICE=name. + @if [ -n "$(PROFILE)" ]; then \ + $(DC) --profile "$(PROFILE)" pull $(SERVICE); \ + else \ + $(DC) pull $(SERVICE); \ + fi + +build: ## Build compose images. Use SERVICE=whisper-server-gpu or PROFILE=voice. + @if [ -n "$(PROFILE)" ]; then \ + $(DC) --profile "$(PROFILE)" build $(SERVICE); \ + else \ + $(DC) build $(SERVICE); \ + fi + +logs: ## Follow compose logs. Use SERVICE=name and LOGS_TAIL=n. + $(DC) logs -f --tail="$(LOGS_TAIL)" $(SERVICE) + +shell: ## Open a shell in a running compose service. Use SERVICE=name. + @test -n "$(SERVICE)" || { echo "SERVICE is required, e.g. make shell SERVICE=litellm"; exit 2; } + $(DC) exec $(SERVICE) sh + +clean: ## Stop root compose services and remove anonymous volumes/orphans. + $(DC) down --remove-orphans --volumes + +api-up: ## Start LiteLLM and its Postgres/init services. + $(DC) --profile api up -d + +api-down: ## Stop LiteLLM profile services. + $(DC) --profile api down + +api-restart: ## Restart LiteLLM proxy container. + $(DC) restart litellm + +api-init: ## Run LiteLLM credential/model initialization once. + $(DC) --profile api run --rm litellm-init + +api-init-force: ## Force LiteLLM credential/model initialization. + $(DC) --profile api run --rm -e FORCE=1 litellm-init + +api-health: ## Run LiteLLM health check and auto-dedup script. + ./litellm-health-check.sh + +api-dedup: ## Remove duplicate LiteLLM model DB entries. + ./litellm-dedup.sh + +api-logs: ## Follow LiteLLM logs. + $(DC) logs -f --tail="$(LOGS_TAIL)" litellm litellm-db litellm-init + +voice-up: ## Start default voice services: NPU Whisper and Kokoro TTS. + $(DC) --profile voice up -d + +voice-gpu: ## Start manual GPU whisper fallback and Kokoro TTS. + $(DC) --profile voice-gpu --profile voice up -d whisper-server-gpu kokoro-tts + +voice-cpu: ## Start CPU whisper server and Kokoro TTS. + $(DC) --profile voice-cpu-backup --profile voice up -d whisper-server kokoro-tts + +voice-down: ## Stop voice profile services. + $(DC) --profile voice --profile voice-gpu --profile voice-cpu-backup down + +voice-build: ## Build the custom Blackwell CUDA whisper image. + $(DC) --profile voice-gpu build whisper-server-gpu + +voice-logs: ## Follow default voice service logs. + $(DC) logs -f --tail="$(LOGS_TAIL)" whisper-server-npu kokoro-tts + +search-up: ## Start Brave Search MCP and SearXNG. + $(DC) --profile search up -d + +search-down: ## Stop search profile services. + $(DC) --profile search down + +automation-up: ## Start n8n automation service. + $(DC) --profile automation up -d + +automation-down: ## Stop automation profile services. + $(DC) --profile automation down + +n8n-logs: ## Follow n8n automation logs. + $(DC) logs -f --tail="$(LOGS_TAIL)" n8n-agent + +common-config: ## Validate and render all swarm-common compose profiles. + BRAVE_API_KEY="$${BRAVE_API_KEY:-dummy}" $(COMMON_DC) --profile "*" config + +common-ps: ## Show swarm-common compose service status. + $(COMMON_DC) ps + +common-up: ## Start swarm-common compose services. Use PROFILE=... or SERVICE=name. + @if [ -n "$(PROFILE)" ]; then \ + $(COMMON_DC) --profile "$(PROFILE)" up -d $(SERVICE); \ + else \ + $(COMMON_DC) up -d $(SERVICE); \ + fi + +common-down: ## Stop swarm-common compose services. Use PROFILE=... to include profiled services. + @if [ -n "$(PROFILE)" ]; then \ + $(COMMON_DC) --profile "$(PROFILE)" down; \ + else \ + $(COMMON_DC) down; \ + fi + +common-logs: ## Follow swarm-common compose logs. Use SERVICE=name. + $(COMMON_DC) logs -f --tail="$(LOGS_TAIL)" $(SERVICE) + +openclaw-instances: ## List OpenClaw instances from the registry. + @jq -r '.instances[] | "\(.name)\t\(.status)\t\(.domain)\t\(.user)@\(.host)\t\(.vcpus)vCPU/\(.memory_mib)MiB"' $(OPENCLAW_REGISTRY) + +openclaw-info: ## Show registry details for one OpenClaw instance. Use HOST=zap. + @jq '.instances[] | select(.name == "$(HOST)")' $(OPENCLAW_REGISTRY) + +openclaw-status: openclaw-health ## Show VM and guest gateway health. Use HOST=zap. + +openclaw-health: vm-info gateway-health ## Show VM and guest gateway health. Use HOST=zap. + +openclaw-logs gateway-logs: ## Show recent OpenClaw gateway logs. Use HOST=zap LOGS_TAIL=200. + $(REQUIRE_INSTANCE) + ssh $(OPENCLAW_USER)@$(OPENCLAW_HOST) "journalctl --user -u openclaw-gateway.service --no-pager -n $(LOGS_TAIL)" + +openclaw-version: ## Show OpenClaw service and CLI version hints. Use HOST=zap. + $(REQUIRE_INSTANCE) + ssh $(OPENCLAW_USER)@$(OPENCLAW_HOST) "grep OPENCLAW_SERVICE_VERSION ~/.config/systemd/user/openclaw-gateway.service || true; grep 'openclaw@' ~/.local/bin/openclaw | head -1 || true" + +openclaw-config: ## Show guest OpenClaw config file list. Use HOST=zap. + $(REQUIRE_INSTANCE) + ssh $(OPENCLAW_USER)@$(OPENCLAW_HOST) "find ~/.openclaw -maxdepth 2 -type f | sort | head -200" + +openclaw-ssh: ## Open SSH as the OpenClaw application user. Use HOST=zap. + $(REQUIRE_INSTANCE) + ssh $(OPENCLAW_USER)@$(OPENCLAW_HOST) + +openclaw-root-ssh: ## Open SSH as root. Use HOST=zap. + $(REQUIRE_INSTANCE) + ssh root@$(OPENCLAW_HOST) + +gateway-status: ## Show the OpenClaw gateway systemd user service status. Use HOST=zap. + $(REQUIRE_INSTANCE) + ssh $(OPENCLAW_USER)@$(OPENCLAW_HOST) "systemctl --user status openclaw-gateway.service --no-pager" + +gateway-health: ## Check gateway service, listener, HTTP status, memory, disk, and uptime. Use HOST=zap. + $(REQUIRE_INSTANCE) + ssh $(OPENCLAW_USER)@$(OPENCLAW_HOST) "systemctl --user is-active openclaw-gateway.service; ps aux | grep openclaw | grep -v grep || true; ss -tlnp | grep -E '(openclaw|$(OPENCLAW_PORT))' || true; curl -s -o /dev/null -w 'gateway_http=%{http_code}\n' http://127.0.0.1:$(OPENCLAW_PORT)/; free -h; df -h /; uptime" + +gateway-restart: ## Restart the OpenClaw gateway user service. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(REQUIRE_INSTANCE) + ssh root@$(OPENCLAW_HOST) "su - $(OPENCLAW_USER) -c 'systemctl --user restart openclaw-gateway.service'" + $(MAKE) --no-print-directory gateway-health HOST=$(HOST) + +vm-list: ## List OpenClaw libvirt VMs. + virsh -c $(QEMU_URI) list --all + +vm-autostart-list: ## List libvirt VMs configured to start at host boot. + virsh -c $(QEMU_URI) list --all --autostart + +vm-info: ## Show libvirt domain info. Use HOST=zap. + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) dominfo "$(OPENCLAW_DOMAIN)" + +vm-mem: ## Show libvirt memory stats. Use HOST=zap. + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) dommemstat "$(OPENCLAW_DOMAIN)" + +vm-disks: ## Show libvirt disk devices. Use HOST=zap. + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) domblklist "$(OPENCLAW_DOMAIN)" + +vm-ifaces: ## Show libvirt network interfaces. Use HOST=zap. + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) domiflist "$(OPENCLAW_DOMAIN)" + +vm-dhcp: ## Show libvirt default network DHCP leases. + virsh -c $(QEMU_URI) net-dhcp-leases default + +vm-start: ## Start an OpenClaw VM. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) start "$(OPENCLAW_DOMAIN)" + $(MAKE) --no-print-directory vm-info HOST=$(HOST) + +vm-shutdown: ## Gracefully shut down an OpenClaw VM. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) shutdown "$(OPENCLAW_DOMAIN)" + +vm-reboot: ## Reboot an OpenClaw VM. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) reboot "$(OPENCLAW_DOMAIN)" + +vm-autostart-enable: ## Enable host-boot autostart for one OpenClaw VM. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) autostart "$(OPENCLAW_DOMAIN)" + $(MAKE) --no-print-directory vm-info HOST=$(HOST) + +vm-autostart-disable: ## Disable host-boot autostart for one OpenClaw VM. Use HOST=orb CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) autostart --disable "$(OPENCLAW_DOMAIN)" + $(MAKE) --no-print-directory vm-info HOST=$(HOST) + +vm-autostart-zap-only: ## Configure only zap to start at host boot. Use CONFIRM=yes. + $(REQUIRE_CONFIRM) + virsh -c $(QEMU_URI) autostart "zap [claw]" + virsh -c $(QEMU_URI) autostart --disable "orb [claw]" + virsh -c $(QEMU_URI) autostart --disable "sun [claw]" + $(MAKE) --no-print-directory vm-autostart-list + +vm-snapshot-list: ## List libvirt snapshots. Use HOST=zap. + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) snapshot-list "$(OPENCLAW_DOMAIN)" + +vm-snapshot: ## Create a libvirt snapshot. Use HOST=zap CONFIRM=yes SNAPSHOT=name. + $(REQUIRE_CONFIRM) + $(REQUIRE_INSTANCE) + virsh -c $(QEMU_URI) snapshot-create-as "$(OPENCLAW_DOMAIN)" --name "$${SNAPSHOT:-pre-change-$$(date +%Y%m%d-%H%M%S)}" + +provision: ## Provision the KVM/libvirt VM. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(ANSIBLE_PLAYBOOK) playbooks/provision-vm.yml --limit $(HOST) + +install: ## Install OpenClaw in the VM. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(ANSIBLE_PLAYBOOK) playbooks/install.yml --limit $(HOST) + +customize: ## Apply post-provision VM customizations. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(ANSIBLE_PLAYBOOK) playbooks/customize.yml --limit $(HOST) + +deploy: ## Run the deploy playbook. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + $(ANSIBLE_PLAYBOOK) playbooks/deploy.yml --limit $(HOST) + +restore: ## Restore OpenClaw VM config. Use HOST=zap CONFIRM=yes and optional IP=. + $(REQUIRE_CONFIRM) + @if [ -n "$${IP:-}" ]; then \ + ./restore-openclaw-vm.sh "$(HOST)" "$${IP}"; \ + else \ + ./restore-openclaw-vm.sh "$(HOST)"; \ + fi + +backup: ## Back up OpenClaw VM config. Use HOST=zap CONFIRM=yes. + $(REQUIRE_CONFIRM) + ./backup-openclaw-vm.sh "$(HOST)" + +timers: ## Show local user timers related to OpenClaw and LiteLLM. + systemctl --user list-timers 'openclaw-backup.timer' 'litellm-health-check.timer' + +kube-status: ## Show Kubernetes context, nodes, and pods using swarm-kubeconfig.yaml. + KUBECONFIG=swarm-kubeconfig.yaml kubectl config current-context + KUBECONFIG=swarm-kubeconfig.yaml kubectl get nodes -o wide + KUBECONFIG=swarm-kubeconfig.yaml kubectl get pods -A diff --git a/scripts/docker-health-server.py b/scripts/docker-health-server.py new file mode 100644 index 0000000..b235dd7 --- /dev/null +++ b/scripts/docker-health-server.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +""" +Docker Health Endpoint - lightweight HTTP server that exposes container health state. +Listens on 0.0.0.0:18809 (configurable via PORT env var). + +Endpoints: + GET /health -> all monitored containers + GET /health/ -> single container +""" + +import http.server +import json +import os +import subprocess +import sys + +PORT = int(os.environ.get("PORT", 18809)) + +# Containers to monitor +CONTAINERS = [ + "brave-search", + "kokoro-tts", + "litellm", + "litellm-db", + "n8n-agent", + "searxng", + "whisper-server-npu", +] + + +def inspect_container(name: str) -> dict: + """Run docker inspect and extract health info for a single container.""" + try: + result = subprocess.run( + ["docker", "inspect", "--format", + "{{.State.Status}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}n/a{{end}}|{{.RestartCount}}", + name], + capture_output=True, text=True, timeout=5, + ) + if result.returncode != 0: + return {"name": name, "status": "not_found", "health": "unknown", "restarts": -1} + parts = result.stdout.strip().split("|") + if len(parts) != 3: + return {"name": name, "status": "error", "health": "unknown", "restarts": -1} + return { + "name": name, + "status": parts[0], + "health": parts[1], + "restarts": int(parts[2]) if parts[2].isdigit() else 0, + } + except Exception as e: + return {"name": name, "status": "error", "health": str(e), "restarts": -1} + + +def inspect_all() -> list: + """Inspect all monitored containers.""" + return [inspect_container(c) for c in CONTAINERS] + + +class HealthHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + path = self.path.rstrip("/") + if path == "/health": + data = {"containers": inspect_all()} + self._json_response(data) + elif path.startswith("/health/"): + name = path[len("/health/"):] + data = inspect_container(name) + self._json_response(data) + else: + self._json_response({"error": "not found"}, status=404) + + def _json_response(self, data, status=200): + body = json.dumps(data, indent=2).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + # Suppress default stderr logging + pass + + +def main(): + server = http.server.HTTPServer(("0.0.0.0", PORT), HealthHandler) + print(f"docker-health-server listening on 0.0.0.0:{PORT}", flush=True) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/scripts/url-content-extractor.py b/scripts/url-content-extractor.py new file mode 100644 index 0000000..2220e86 --- /dev/null +++ b/scripts/url-content-extractor.py @@ -0,0 +1,339 @@ +#!/usr/bin/env python3 +""" +URL Content Extractor Endpoint +Lightweight HTTP server that classifies URLs and extracts content. + +Supports: + - YouTube videos: extracts transcript via youtube-transcript-api + - PDF files: downloads and extracts text via pymupdf + - Web pages: fetches HTML and extracts readable text via readability-lxml + +Listens on 0.0.0.0:18812 (configurable via PORT env var). + +Endpoints: + POST /extract -> {url: "..."} -> JSON with content_type, title, text, metadata + GET /healthz -> returns ok +""" + +import http.server +import json +import os +import re +import sys +import tempfile +import traceback +import urllib.request +import urllib.parse +import urllib.error + +PORT = int(os.environ.get("PORT", 18812)) +MAX_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB max download + +YOUTUBE_PATTERNS = [ + re.compile(r'(?:youtube\.com/watch\?.*v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})'), + re.compile(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})'), +] + +PDF_EXTENSIONS = ('.pdf',) +PDF_CONTENT_TYPES = ('application/pdf',) + + +def _import_youtube(): + from youtube_transcript_api import YouTubeTranscriptApi + return YouTubeTranscriptApi + +def _import_fitz(): + import fitz + return fitz + +def _import_readability(): + from readability import Document + from lxml.html import document_fromstring + return Document, document_fromstring + + +def classify_url(url: str) -> str: + """Classify URL as youtube, pdf, or web.""" + parsed = urllib.parse.urlparse(url) + host = (parsed.hostname or '').lower() + path = parsed.path.lower() + + # Check YouTube + for pat in YOUTUBE_PATTERNS: + if pat.search(url): + return 'youtube' + + # Check PDF by extension + if path.endswith(PDF_EXTENSIONS): + return 'pdf' + + # Check known PDF-hosting domains with non-.pdf paths + pdf_host_patterns = [ + 'arxiv.org/pdf/', + ] + for pattern in pdf_host_patterns: + if pattern in url.lower(): + return 'pdf' + + return 'web' + + +def extract_youtube_id(url: str) -> str | None: + """Extract YouTube video ID from URL.""" + for pat in YOUTUBE_PATTERNS: + m = pat.search(url) + if m: + return m.group(1) + return None + + +def fetch_youtube(url: str) -> dict: + """Extract YouTube video transcript.""" + YTTA = _import_youtube() + video_id = extract_youtube_id(url) + if not video_id: + return {"error": "Could not extract YouTube video ID", "content_type": "youtube"} + + try: + api = YTTA() + transcript_data = api.fetch(video_id, languages=['en', 'en-US', 'en-GB']) + + # Try to get video title from the page + title = video_id + try: + req = urllib.request.Request( + f"https://www.youtube.com/watch?v={video_id}", + headers={"User-Agent": "Mozilla/5.0"} + ) + resp = urllib.request.urlopen(req, timeout=15) + html = resp.read().decode('utf-8', errors='replace') + m = re.search(r'(.*?)', html) + if m: + title = m.group(1).replace(' - YouTube', '').strip() + except Exception: + pass + + # Build transcript text + parts = [] + for entry in transcript_data: + parts.append(entry.text) + text = " ".join(parts) + + return { + "content_type": "youtube", + "title": title, + "text": text, + "metadata": { + "video_id": video_id, + "source_url": url, + "transcript_entries": len(transcript_data), + } + } + except Exception as e: + return {"error": f"YouTube transcript extraction failed: {e}", "content_type": "youtube"} + + +def fetch_pdf(url: str) -> dict: + """Download PDF and extract text.""" + fitz = _import_fitz() + + try: + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=60) + data = resp.read(MAX_CONTENT_SIZE + 1) + if len(data) > MAX_CONTENT_SIZE: + return {"error": "PDF too large (>50MB)", "content_type": "pdf"} + + with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp: + tmp.write(data) + tmp.flush() + doc = fitz.open(tmp.name) + + title = "" + author = "" + try: + meta = doc.metadata or {} + title = meta.get("title", "") or "" + author = meta.get("author", "") or "" + except Exception: + pass + + if not title: + title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF" + + pages = [] + for page_num in range(len(doc)): + page = doc[page_num] + pages.append(page.get_text()) + doc.close() + + text = "\n\n".join(pages) + + return { + "content_type": "pdf", + "title": title, + "text": text, + "metadata": { + "source_url": url, + "author": author, + "page_count": len(pages), + } + } + except Exception as e: + return {"error": f"PDF extraction failed: {e}", "content_type": "pdf"} + + +def fetch_web(url: str) -> dict: + """Fetch web page and extract readable text.""" + Document, document_fromstring = _import_readability() + + try: + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=30) + + # Check if response is actually a PDF (content-type detection) + content_type = resp.headers.get('Content-Type', '') + if 'application/pdf' in content_type: + # Re-process as PDF + data = resp.read(MAX_CONTENT_SIZE + 1) + if len(data) > MAX_CONTENT_SIZE: + return {"error": "PDF too large (>50MB)", "content_type": "pdf"} + + fitz = _import_fitz() + with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp: + tmp.write(data) + tmp.flush() + doc = fitz.open(tmp.name) + title = "" + author = "" + try: + meta = doc.metadata or {} + title = meta.get("title", "") or "" + author = meta.get("author", "") or "" + except Exception: + pass + if not title: + title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF" + pages = [] + for page_num in range(len(doc)): + pages.append(doc[page_num].get_text()) + doc.close() + return { + "content_type": "pdf", + "title": title, + "text": "\n\n".join(pages), + "metadata": { + "source_url": url, + "author": author, + "page_count": len(pages), + } + } + + html = resp.read().decode('utf-8', errors='replace') + + doc = Document(html) + title = doc.title() or "" + summary_html = doc.summary() + + # Convert HTML summary to plain text + tree = document_fromstring(summary_html) + text = tree.text_content() + + # Clean up whitespace + text = re.sub(r'\n{3,}', '\n\n', text) + text = text.strip() + + return { + "content_type": "web", + "title": title, + "text": text, + "metadata": { + "source_url": url, + } + } + except Exception as e: + return {"error": f"Web extraction failed: {e}", "content_type": "web"} + + +def extract_content(url: str) -> dict: + """Main extraction dispatcher.""" + content_type = classify_url(url) + + if content_type == 'youtube': + return fetch_youtube(url) + elif content_type == 'pdf': + return fetch_pdf(url) + else: + return fetch_web(url) + + +class ExtractorHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + path = self.path.rstrip("/") + if path == "/healthz": + self._json_response({"status": "ok"}) + else: + self._json_response({"error": "not found", "hint": "POST /extract with {url: ...}"}, status=404) + + def do_POST(self): + path = self.path.rstrip("/") + if path != "/extract": + self._json_response({"error": "not found"}, status=404) + return + + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length) + data = json.loads(body) if body else {} + except Exception as e: + self._json_response({"error": f"Invalid request body: {e}"}, status=400) + return + + url = data.get("url", "").strip() + if not url: + self._json_response({"error": "Missing 'url' field"}, status=400) + return + + if not url.startswith(("http://", "https://")): + self._json_response({"error": "URL must start with http:// or https://"}, status=400) + return + + print(f"Extracting: {url}", flush=True) + try: + result = extract_content(url) + except Exception as e: + result = {"error": f"Internal error: {e}"} + + if "error" in result: + print(f"Error: {result['error']}", flush=True) + self._json_response(result, status=500) + else: + ct = result.get("content_type", "?") + tlen = len(result.get("text", "")) + print(f"Success: {ct}, {tlen} chars", flush=True) + self._json_response(result) + + def _json_response(self, data, status=200): + body = json.dumps(data, indent=2).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + pass + + +def main(): + server = http.server.HTTPServer(("0.0.0.0", PORT), ExtractorHandler) + print(f"url-content-extractor listening on 0.0.0.0:{PORT}", flush=True) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/scripts/voice-memo-processor.py b/scripts/voice-memo-processor.py new file mode 100644 index 0000000..ab6ed02 --- /dev/null +++ b/scripts/voice-memo-processor.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python3 +""" +Voice Memo Processor Endpoint +Handles voice memo processing with support for: + - Audio URL (existing behavior) + - Telegram voice messages (file_id) + - Kokoro TTS read-back of summaries + +Listens on 0.0.0.0:18813 (configurable via PORT env var). + +Endpoints: + POST /process -> Process voice memo (download + transcribe + summarize + optional TTS) + POST /tts -> Generate TTS audio from text (Kokoro) + GET /audio/ -> Serve generated audio file + GET /healthz -> Health check +""" + +import hashlib +import http.server +import json +import os +import re +import subprocess +import sys +import tempfile +import urllib.request +import urllib.parse +import urllib.error + +PORT = int(os.environ.get("PORT", 18813)) +AUDIO_DIR = os.path.join(tempfile.gettempdir(), "voice-memo-audio") +os.makedirs(AUDIO_DIR, exist_ok=True) + +# Service endpoints (from host perspective) +WHISPER_URL = os.environ.get("WHISPER_URL", "http://127.0.0.1:18816/v1/audio/transcriptions") +LLM_URL = os.environ.get("LLM_URL", "http://127.0.0.1:18806/v1/chat/completions") +KOKORO_URL = os.environ.get("KOKORO_URL", "http://127.0.0.1:18805/v1/audio/speech") + +# Telegram Bot API +TELEGRAM_BOT_TOKEN = "" +_token_paths = [ + os.path.expanduser("~/.hermes/.env"), + os.path.expanduser("~/lab/swarm/.env"), +] +for _p in _token_paths: + if os.path.isfile(_p): + with open(_p) as _f: + for _line in _f: + _line = _line.strip() + if _line.startswith("TELEGRAM_BOT_TOKEN="): + TELEGRAM_BOT_TOKEN = _line.split("=", 1)[1].strip().strip('"').strip("'") + break + if TELEGRAM_BOT_TOKEN: + break + + +def _json_response(handler, data, status=200): + body = json.dumps(data, indent=2).encode() + handler.send_response(status) + handler.send_header("Content-Type", "application/json") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def _file_response(handler, filepath, content_type="audio/mpeg"): + with open(filepath, "rb") as f: + data = f.read() + handler.send_response(200) + handler.send_header("Content-Type", content_type) + handler.send_header("Content-Length", str(len(data))) + handler.end_headers() + handler.wfile.write(data) + + +def download_telegram_voice(file_id: str) -> str: + """Download a Telegram voice file by file_id, return local path.""" + if not TELEGRAM_BOT_TOKEN: + raise ValueError("TELEGRAM_BOT_TOKEN not configured") + + # Get file path + url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile?file_id={file_id}" + resp = urllib.request.urlopen(url, timeout=15) + data = json.loads(resp.read()) + if not data.get("ok"): + raise ValueError(f"Telegram getFile failed: {data}") + + file_path = data["result"]["file_path"] + + # Download the file + download_url = f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" + resp = urllib.request.urlopen(download_url, timeout=60) + audio_data = resp.read() + + # Save to temp file with appropriate extension + ext = os.path.splitext(file_path)[1] or ".ogg" + tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR) + tmp.write(audio_data) + tmp.close() + return tmp.name + + +def download_audio_url(url: str) -> str: + """Download audio from URL, return local path.""" + ext = ".mp3" + parsed = urllib.parse.urlparse(url) + path_ext = os.path.splitext(parsed.path)[1] + if path_ext in (".ogg", ".oga", ".opus", ".wav", ".m4a", ".webm", ".flac"): + ext = path_ext + + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=60) + audio_data = resp.read() + + # Check content type for better extension guess + ct = resp.headers.get("Content-Type", "") + if "ogg" in ct: + ext = ".ogg" + elif "webm" in ct: + ext = ".webm" + elif "wav" in ct: + ext = ".wav" + elif "mp4" in ct or "m4a" in ct: + ext = ".m4a" + + tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR) + tmp.write(audio_data) + tmp.close() + return tmp.name + + +def transcribe_whisper(audio_path: str) -> str: + """Transcribe audio file using local Whisper.""" + filename = os.path.basename(audio_path) + + # Build multipart form data + boundary = "----VoiceMemoBoundary" + with open(audio_path, "rb") as f: + file_data = f.read() + + body = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' + f"Content-Type: application/octet-stream\r\n\r\n" + ).encode() + file_data + ( + f"\r\n--{boundary}\r\n" + f'Content-Disposition: form-data; name="model"\r\n\r\n' + f"whisper-1\r\n" + f"--{boundary}--\r\n" + ).encode() + + req = urllib.request.Request( + WHISPER_URL, + data=body, + headers={ + "Content-Type": f"multipart/form-data; boundary={boundary}", + }, + ) + resp = urllib.request.urlopen(req, timeout=120) + result = json.loads(resp.read()) + + transcript = ( + result.get("text", "") + or result.get("transcription", "") + or (", ".join(s.get("text", "") for s in result.get("segments", [])) if "segments" in result else "") + ) + if not transcript: + raise ValueError(f"Whisper returned no text: {json.dumps(result)[:200]}") + return transcript.strip() + + +def summarize_llm(transcript: str, title: str = "Voice Memo") -> str: + """Summarize transcript using local LLM.""" + payload = { + "model": "gemma-4-26b", + "messages": [ + { + "role": "system", + "content": "Convert raw voice memo transcripts into concise useful notes. " + "Return markdown only with Summary, Key Points, Action Items, Open Questions.", + }, + { + "role": "user", + "content": f"Title: {title}\n\nTranscript:\n{transcript[:6000]}", + }, + ], + "temperature": 0.2, + "max_tokens": 900, + } + + req = urllib.request.Request( + LLM_URL, + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + ) + resp = urllib.request.urlopen(req, timeout=120) + result = json.loads(resp.read()) + + return ( + result.get("choices", [{}])[0] + .get("message", {}) + .get("content", "Summary unavailable.") + ) + + +def generate_tts(text: str, voice: str = "af_heart") -> str: + """Generate TTS audio using Kokoro, return path to audio file.""" + payload = { + "model": "kokoro", + "input": text[:4000], # Kokoro has char limits + "voice": voice, + "response_format": "mp3", + "stream": False, + "return_download_link": True, + } + + req = urllib.request.Request( + KOKORO_URL, + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + ) + resp = urllib.request.urlopen(req, timeout=120) + + # Kokoro returns audio directly or with download link + content_type = resp.headers.get("Content-Type", "") + if "audio" in content_type: + # Direct audio response + audio_data = resp.read() + filename = hashlib.sha256(text.encode()).hexdigest()[:16] + ".mp3" + filepath = os.path.join(AUDIO_DIR, filename) + with open(filepath, "wb") as f: + f.write(audio_data) + return filepath + + # Check for download link in headers + download_path = resp.headers.get("X-Download-Path", "") + if download_path: + return download_path + + # Try JSON response + try: + result = json.loads(resp.read()) + if "download_url" in result: + return result["download_url"] + if "audio_url" in result: + return result["audio_url"] + except Exception: + pass + + raise ValueError("Kokoro TTS returned unexpected response format") + + +class VoiceMemoHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + path = self.path.rstrip("/") + + if path == "/healthz": + _json_response(self, {"status": "ok"}) + return + + # Serve audio files: /audio/ + if path.startswith("/audio/"): + filename = path[len("/audio/"):] + filepath = os.path.join(AUDIO_DIR, filename) + if os.path.isfile(filepath): + _file_response(self, filepath, "audio/mpeg") + return + _json_response(self, {"error": "audio file not found"}, status=404) + return + + _json_response(self, {"error": "not found"}, status=404) + + def do_POST(self): + path = self.path.rstrip("/") + + if path == "/healthz": + _json_response(self, {"status": "ok"}) + return + + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length) + data = json.loads(body) if body else {} + except Exception as e: + _json_response(self, {"error": f"Invalid request body: {e}"}, status=400) + return + + if path == "/tts": + self._handle_tts(data) + return + + if path == "/process": + self._handle_process(data) + return + + _json_response(self, {"error": "not found"}, status=404) + + def _handle_tts(self, data): + """Handle TTS-only request.""" + text = data.get("text", "").strip() + if not text: + _json_response(self, {"error": "Missing 'text' field"}, status=400) + return + + voice = data.get("voice", "af_heart") + print(f"TTS: {len(text)} chars, voice={voice}", flush=True) + + try: + audio_path = generate_tts(text, voice) + filename = os.path.basename(audio_path) + audio_url = f"/audio/{filename}" + _json_response(self, { + "audio_path": audio_path, + "audio_url": audio_url, + "filename": filename, + }) + except Exception as e: + print(f"TTS error: {e}", flush=True) + _json_response(self, {"error": f"TTS failed: {e}"}, status=500) + + def _handle_process(self, data): + """Handle full voice memo processing pipeline.""" + # Determine audio source + audio_url = data.get("audio_url", "").strip() + telegram_file_id = data.get("telegram_file_id", "").strip() + discord_audio_url = data.get("discord_audio_url", "").strip() + title = data.get("title", "Voice Memo") + tags = data.get("tags", ["voice", "memo"]) + include_tts = data.get("include_tts", False) + voice = data.get("voice", "af_heart") + + source_type = "url" + local_audio = None + + try: + # Download audio from appropriate source + if telegram_file_id: + print(f"Processing Telegram voice: {telegram_file_id[:20]}...", flush=True) + local_audio = download_telegram_voice(telegram_file_id) + source_type = "telegram" + elif discord_audio_url: + print(f"Processing Discord voice: {discord_audio_url[:50]}...", flush=True) + local_audio = download_audio_url(discord_audio_url) + source_type = "discord" + elif audio_url: + print(f"Processing audio URL: {audio_url[:50]}...", flush=True) + local_audio = download_audio_url(audio_url) + source_type = "url" + else: + _json_response(self, { + "error": "Must provide one of: audio_url, telegram_file_id, discord_audio_url" + }, status=400) + return + + # Transcribe + print(f"Transcribing {os.path.basename(local_audio)}...", flush=True) + transcript = transcribe_whisper(local_audio) + print(f"Transcript: {len(transcript)} chars", flush=True) + + # Summarize + print("Summarizing...", flush=True) + summary = summarize_llm(transcript, title) + print(f"Summary: {len(summary)} chars", flush=True) + + # Optional TTS + tts_url = None + tts_path = None + if include_tts and summary: + try: + print("Generating TTS read-back...", flush=True) + tts_path = generate_tts(summary, voice) + tts_filename = os.path.basename(tts_path) + tts_url = f"/audio/{tts_filename}" + print(f"TTS: {tts_filename}", flush=True) + except Exception as e: + print(f"TTS warning (non-fatal): {e}", flush=True) + + result = { + "source_type": source_type, + "title": title, + "tags": tags, + "transcript": transcript, + "summary": summary, + "created_at": __import__("datetime").datetime.now().isoformat(), + } + if tts_url: + result["tts_audio_url"] = tts_url + result["tts_audio_path"] = tts_path + + _json_response(self, result) + + except Exception as e: + print(f"Error: {e}", flush=True) + _json_response(self, {"error": str(e)}, status=500) + finally: + # Clean up downloaded audio (keep TTS files for serving) + if local_audio and os.path.isfile(local_audio): + try: + os.unlink(local_audio) + except Exception: + pass + + def log_message(self, format, *args): + pass + + +def main(): + server = http.server.HTTPServer(("0.0.0.0", PORT), VoiceMemoHandler) + print(f"voice-memo-processor listening on 0.0.0.0:{PORT}", flush=True) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/scripts/voice-memo-service.py b/scripts/voice-memo-service.py new file mode 100644 index 0000000..b4599c6 --- /dev/null +++ b/scripts/voice-memo-service.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +"""Voice Memo Pipeline Service - native voice ingress + Kokoro TTS read-back.""" +from __future__ import annotations +import base64, json, os, re, time +import urllib.error, urllib.request, uuid +from http.server import HTTPServer, BaseHTTPRequestHandler +from pathlib import Path + +PORT = int(os.environ.get("VOICE_MEMO_PORT", "18813")) +WHISPER_URL = os.environ.get("WHISPER_BASE_URL", "http://127.0.0.1:18816") +LLM_URL = os.environ.get("LLAMA_CPP_BASE_URL", "http://127.0.0.1:18806") +KOKORO_URL = os.environ.get("KOKORO_BASE_URL", "http://127.0.0.1:18805") +TELEGRAM_BOT_TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN", "") +DISCORD_BOT_TOKEN = os.environ.get("DISCORD_BOT_TOKEN", "") +KOKORO_VOICE = os.environ.get("KOKORO_VOICE", "af_heart") +AUDIO_DIR = Path(os.environ.get("VOICE_MEMO_AUDIO_DIR", "/tmp/voice-memo-audio")) +LLM_MODEL = os.environ.get("VOICE_MEMO_LLM_MODEL", "local") +AUDIO_DIR.mkdir(parents=True, exist_ok=True) + +def log(msg): + print(f"[voice-memo] {time.strftime('%H:%M:%S')} {msg}", flush=True) + +def encode_multipart(fields, files): + boundary = "----voice-memo-" + uuid.uuid4().hex + parts = [] + for n, v in fields.items(): + parts.append(f"--{boundary}\r\n".encode()) + parts.append(f'Content-Disposition: form-data; name="{n}"\r\n\r\n'.encode()) + parts.append(str(v).encode()) + parts.append(b"\r\n") + for n, (fn, data, ct) in files.items(): + parts.append(f"--{boundary}\r\n".encode()) + parts.append(f'Content-Disposition: form-data; name="{n}"; filename="{fn}"\r\n'.encode()) + parts.append(f"Content-Type: {ct}\r\n\r\n".encode()) + parts.append(data) + parts.append(b"\r\n") + parts.append(f"--{boundary}--\r\n".encode()) + return b"".join(parts), f"multipart/form-data; boundary={boundary}" + +def http_get_json(url, headers=None, timeout=30): + req = urllib.request.Request(url, method="GET") + for k, v in (headers or {}).items(): + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=timeout) as r: + return json.loads(r.read().decode()) + +def http_download(url, headers=None, timeout=120): + req = urllib.request.Request(url, method="GET") + for k, v in (headers or {}).items(): + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.read() + +def download_telegram_voice(file_id): + if not TELEGRAM_BOT_TOKEN: + raise ValueError("TELEGRAM_BOT_TOKEN not configured") + base = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}" + info = http_get_json(f"{base}/getFile?file_id={file_id}") + if not info.get("ok"): + raise ValueError(f"Telegram getFile failed: {info}") + fp = info["result"]["file_path"] + return http_download(f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{fp}") + +def download_discord_attachment(url): + hdrs = {} + if DISCORD_BOT_TOKEN: + hdrs["Authorization"] = f"Bot {DISCORD_BOT_TOKEN}" + return http_download(url, headers=hdrs) + +def transcribe_audio(audio_data, filename="audio.ogg", language="en"): + fields = {"response_format": "json", "language": language or "en", "temperature": "0.0"} + files = {"file": (filename, audio_data, "application/octet-stream")} + body, ct = encode_multipart(fields, files) + url = WHISPER_URL.rstrip("/") + "/v1/audio/transcriptions" + req = urllib.request.Request(url, data=body, headers={"Content-Type": ct}, method="POST") + try: + with urllib.request.urlopen(req, timeout=300) as r: + raw = r.read().decode() + except urllib.error.HTTPError as e: + raise RuntimeError(f"Whisper HTTP {e.code}: {e.read().decode()[:300]}") + data = json.loads(raw) + text = str(data.get("text", data.get("transcript", ""))).strip() if isinstance(data, dict) else raw.strip() + if not text: + raise RuntimeError("Whisper returned no transcript") + return text + +SUMMARY_PROMPT = """You process voice memos. Given the transcript, produce a JSON object with: +- "summary": 2-4 sentence summary +- "action_items": list of tasks/reminders/follow-ups (empty list if none) +Output ONLY valid JSON. +TRANSCRIPT: +{transcript}""" + +def summarize_transcript(transcript): + payload = { + "model": LLM_MODEL, + "messages": [ + {"role": "system", "content": "You output only valid JSON."}, + {"role": "user", "content": SUMMARY_PROMPT.format(transcript=transcript)} + ], + "temperature": 0.3, "max_tokens": 1024, "stream": False + } + url = LLM_URL.rstrip("/") + "/v1/chat/completions" + req = urllib.request.Request(url, data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, method="POST") + try: + with urllib.request.urlopen(req, timeout=120) as r: + result = json.loads(r.read().decode()) + except urllib.error.HTTPError as e: + raise RuntimeError(f"LLM HTTP {e.code}: {e.read().decode()[:300]}") + content = result.get("choices", [{}])[0].get("message", {}).get("content", "").strip() + m = re.search(r"\{[\s\S]*\}", content) + if m: + try: + p = json.loads(m.group()) + return {"summary": p.get("summary", content), "action_items": p.get("action_items", [])} + except json.JSONDecodeError: + pass + return {"summary": content, "action_items": []} + +def generate_tts(text, voice=None, fmt="mp3", speed=1.0): + payload = {"model": "kokoro", "input": text, "voice": voice or KOKORO_VOICE, + "response_format": fmt, "speed": speed, "stream": False} + url = KOKORO_URL.rstrip("/") + "/v1/audio/speech" + req = urllib.request.Request(url, data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json", "Accept": "audio/*"}, method="POST") + try: + with urllib.request.urlopen(req, timeout=120) as r: + return r.read() + except urllib.error.HTTPError as e: + raise RuntimeError(f"Kokoro HTTP {e.code}: {e.read().decode()[:300]}") + +def save_audio(data, fmt="mp3"): + fname = f"{uuid.uuid4().hex}.{fmt}" + (AUDIO_DIR / fname).write_bytes(data) + return f"/audio/{fname}" + +def process_memo(body, uploaded_audio=None): + t0 = time.time() + language = body.get("language", "en") + audio_fmt = body.get("audio_format", "ogg") + tts_readback = body.get("tts_readback", False) + tts_voice = body.get("tts_voice", KOKORO_VOICE) + tts_format = body.get("tts_format", "mp3") + source = body.get("source", "unknown") + + if uploaded_audio: + audio_data = uploaded_audio + source = source or "upload" + elif body.get("telegram_file_id"): + log(f"Downloading Telegram voice: {body['telegram_file_id'][:20]}...") + audio_data = download_telegram_voice(body["telegram_file_id"]) + source = "telegram" + elif body.get("discord_audio_url"): + log(f"Downloading Discord attachment...") + audio_data = download_discord_attachment(body["discord_audio_url"]) + source = "discord" + elif body.get("audio_url"): + log(f"Downloading audio URL...") + audio_data = http_download(body["audio_url"]) + source = source or "url" + elif body.get("audio_base64"): + audio_data = base64.b64decode(body["audio_base64"]) + source = source or "base64" + else: + raise ValueError("No audio source. Send: audio_url, telegram_file_id, discord_audio_url, audio_base64, or upload.") + + if not audio_data: + raise ValueError("Audio data is empty") + log(f"Got {len(audio_data)} bytes from {source}") + + ext = "ogg" if source == "telegram" else audio_fmt + log("Transcribing...") + transcript = transcribe_audio(audio_data, filename=f"voice_memo.{ext}", language=language) + log(f"Transcript ({len(transcript)} chars)") + + log("Summarizing...") + result = summarize_transcript(transcript) + + audio_url = None + if tts_readback and result.get("summary"): + log("Generating TTS read-back...") + try: + tts_data = generate_tts(result["summary"], voice=tts_voice, fmt=tts_format) + audio_url = save_audio(tts_data, fmt=tts_format) + log(f"TTS saved: {audio_url}") + except Exception as exc: + log(f"TTS failed (non-fatal): {exc}") + + elapsed = round(time.time() - t0, 2) + log(f"Done in {elapsed}s") + return {"ok": True, "transcript": transcript, "summary": result.get("summary", ""), + "action_items": result.get("action_items", []), "audio_url": audio_url, + "source": source, "duration_s": elapsed, "metadata": body.get("metadata", {})} + +class VoiceMemoHandler(BaseHTTPRequestHandler): + def do_GET(self): + path = self.path.split("?")[0].rstrip("/") + if path == "/healthz": + self._json({"status": "ok", "service": "voice-memo", "port": PORT}) + elif path.startswith("/audio/"): + self._serve_audio(path) + else: + self._json({"error": "not found"}, 404) + + def do_POST(self): + path = self.path.split("?")[0].rstrip("/") + if path == "/memo": + self._handle_json() + elif path == "/memo/upload": + self._handle_upload() + else: + self._json({"error": "not found"}, 404) + + def _handle_json(self): + try: + n = int(self.headers.get("Content-Length", 0)) + body = json.loads(self.rfile.read(n).decode()) + except Exception as e: + return self._json({"ok": False, "error": f"Bad body: {e}"}, 400) + try: + self._json(process_memo(body)) + except Exception as e: + log(f"Error: {e}") + self._json({"ok": False, "error": str(e)}, 500) + + def _handle_upload(self): + try: + ct = self.headers.get("Content-Type", "") + n = int(self.headers.get("Content-Length", 0)) + raw = self.rfile.read(n) + audio_data = None + audio_fmt = "ogg" + if "multipart/form-data" in ct: + boundary = ct.split("boundary=")[-1].strip() + for part in raw.split(f"--{boundary}".encode()): + if not part or part.strip() in (b"--", b"--\r\n"): + continue + try: + hend = part.index(b"\r\n\r\n") + except ValueError: + continue + hdrs = part[:hend].decode("utf-8", errors="replace") + bdata = part[hend+4:] + if bdata.endswith(b"\r\n"): + bdata = bdata[:-2] + if 'name="file"' in hdrs or 'name="audio"' in hdrs: + audio_data = bdata + fm = re.search(r'filename="([^"]+)"', hdrs) + if fm: + e = fm.group(1).rsplit(".", 1)[-1].lower() + if e in ("ogg","mp3","wav","webm","m4a","flac","opus"): + audio_fmt = e + else: + audio_data = raw + self._json(process_memo({"source": "upload", "audio_format": audio_fmt}, uploaded_audio=audio_data)) + except Exception as e: + log(f"Upload error: {e}") + self._json({"ok": False, "error": str(e)}, 500) + + def _serve_audio(self, path): + fname = path.split("/")[-1] + fpath = AUDIO_DIR / fname + if not fpath.exists(): + return self._json({"error": "audio not found"}, 404) + ext = fname.rsplit(".", 1)[-1].lower() + mime = {"mp3":"audio/mpeg","ogg":"audio/ogg","wav":"audio/wav", + "flac":"audio/flac","opus":"audio/opus"}.get(ext, "application/octet-stream") + data = fpath.read_bytes() + self.send_response(200) + self.send_header("Content-Type", mime) + self.send_header("Content-Length", str(len(data))) + self.end_headers() + self.wfile.write(data) + + def _json(self, data, status=200): + body = json.dumps(data, indent=2, ensure_ascii=False).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(body) + + def log_message(self, fmt, *args): + pass + +def main(): + srv = HTTPServer(("0.0.0.0", PORT), VoiceMemoHandler) + log(f"Voice Memo Service on 0.0.0.0:{PORT}") + log(f" Whisper: {WHISPER_URL} LLM: {LLM_URL} Kokoro: {KOKORO_URL}") + try: + srv.serve_forever() + except KeyboardInterrupt: + pass + srv.server_close() + +if __name__ == "__main__": + main()