From ff28a7c1addcb2300a53090dacb7377089f1570b Mon Sep 17 00:00:00 2001 From: William Valentin Date: Wed, 13 May 2026 16:13:00 -0700 Subject: [PATCH] feat(swarm): add URL content extractor + voice memo processor + webhook catalog - url-content-extractor.py on :18812: YouTube/PDF/web content extraction - voice-memo-processor.py on :18813: Telegram/Discord/URL voice ingress + Kokoro TTS - Webhook Action Bus catalog in Obsidian vault - Updated n8n Implementation Handoff: items #8-10 done --- scripts/url-content-extractor.py | 339 +++++++++++++ scripts/voice-memo-processor.py | 418 ++++++++++++++++ .../Automation/Webhook Action Bus.md | 449 ++++++++++++++++++ .../Automation/n8n Implementation Handoff.md | 49 +- 4 files changed, 1232 insertions(+), 23 deletions(-) create mode 100644 scripts/url-content-extractor.py create mode 100644 scripts/voice-memo-processor.py create mode 100644 swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/Webhook Action Bus.md diff --git a/scripts/url-content-extractor.py b/scripts/url-content-extractor.py new file mode 100644 index 0000000..2220e86 --- /dev/null +++ b/scripts/url-content-extractor.py @@ -0,0 +1,339 @@ +#!/usr/bin/env python3 +""" +URL Content Extractor Endpoint +Lightweight HTTP server that classifies URLs and extracts content. + +Supports: + - YouTube videos: extracts transcript via youtube-transcript-api + - PDF files: downloads and extracts text via pymupdf + - Web pages: fetches HTML and extracts readable text via readability-lxml + +Listens on 0.0.0.0:18812 (configurable via PORT env var). + +Endpoints: + POST /extract -> {url: "..."} -> JSON with content_type, title, text, metadata + GET /healthz -> returns ok +""" + +import http.server +import json +import os +import re +import sys +import tempfile +import traceback +import urllib.request +import urllib.parse +import urllib.error + +PORT = int(os.environ.get("PORT", 18812)) +MAX_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB max download + +YOUTUBE_PATTERNS = [ + re.compile(r'(?:youtube\.com/watch\?.*v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})'), + re.compile(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})'), +] + +PDF_EXTENSIONS = ('.pdf',) +PDF_CONTENT_TYPES = ('application/pdf',) + + +def _import_youtube(): + from youtube_transcript_api import YouTubeTranscriptApi + return YouTubeTranscriptApi + +def _import_fitz(): + import fitz + return fitz + +def _import_readability(): + from readability import Document + from lxml.html import document_fromstring + return Document, document_fromstring + + +def classify_url(url: str) -> str: + """Classify URL as youtube, pdf, or web.""" + parsed = urllib.parse.urlparse(url) + host = (parsed.hostname or '').lower() + path = parsed.path.lower() + + # Check YouTube + for pat in YOUTUBE_PATTERNS: + if pat.search(url): + return 'youtube' + + # Check PDF by extension + if path.endswith(PDF_EXTENSIONS): + return 'pdf' + + # Check known PDF-hosting domains with non-.pdf paths + pdf_host_patterns = [ + 'arxiv.org/pdf/', + ] + for pattern in pdf_host_patterns: + if pattern in url.lower(): + return 'pdf' + + return 'web' + + +def extract_youtube_id(url: str) -> str | None: + """Extract YouTube video ID from URL.""" + for pat in YOUTUBE_PATTERNS: + m = pat.search(url) + if m: + return m.group(1) + return None + + +def fetch_youtube(url: str) -> dict: + """Extract YouTube video transcript.""" + YTTA = _import_youtube() + video_id = extract_youtube_id(url) + if not video_id: + return {"error": "Could not extract YouTube video ID", "content_type": "youtube"} + + try: + api = YTTA() + transcript_data = api.fetch(video_id, languages=['en', 'en-US', 'en-GB']) + + # Try to get video title from the page + title = video_id + try: + req = urllib.request.Request( + f"https://www.youtube.com/watch?v={video_id}", + headers={"User-Agent": "Mozilla/5.0"} + ) + resp = urllib.request.urlopen(req, timeout=15) + html = resp.read().decode('utf-8', errors='replace') + m = re.search(r'(.*?)', html) + if m: + title = m.group(1).replace(' - YouTube', '').strip() + except Exception: + pass + + # Build transcript text + parts = [] + for entry in transcript_data: + parts.append(entry.text) + text = " ".join(parts) + + return { + "content_type": "youtube", + "title": title, + "text": text, + "metadata": { + "video_id": video_id, + "source_url": url, + "transcript_entries": len(transcript_data), + } + } + except Exception as e: + return {"error": f"YouTube transcript extraction failed: {e}", "content_type": "youtube"} + + +def fetch_pdf(url: str) -> dict: + """Download PDF and extract text.""" + fitz = _import_fitz() + + try: + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=60) + data = resp.read(MAX_CONTENT_SIZE + 1) + if len(data) > MAX_CONTENT_SIZE: + return {"error": "PDF too large (>50MB)", "content_type": "pdf"} + + with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp: + tmp.write(data) + tmp.flush() + doc = fitz.open(tmp.name) + + title = "" + author = "" + try: + meta = doc.metadata or {} + title = meta.get("title", "") or "" + author = meta.get("author", "") or "" + except Exception: + pass + + if not title: + title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF" + + pages = [] + for page_num in range(len(doc)): + page = doc[page_num] + pages.append(page.get_text()) + doc.close() + + text = "\n\n".join(pages) + + return { + "content_type": "pdf", + "title": title, + "text": text, + "metadata": { + "source_url": url, + "author": author, + "page_count": len(pages), + } + } + except Exception as e: + return {"error": f"PDF extraction failed: {e}", "content_type": "pdf"} + + +def fetch_web(url: str) -> dict: + """Fetch web page and extract readable text.""" + Document, document_fromstring = _import_readability() + + try: + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=30) + + # Check if response is actually a PDF (content-type detection) + content_type = resp.headers.get('Content-Type', '') + if 'application/pdf' in content_type: + # Re-process as PDF + data = resp.read(MAX_CONTENT_SIZE + 1) + if len(data) > MAX_CONTENT_SIZE: + return {"error": "PDF too large (>50MB)", "content_type": "pdf"} + + fitz = _import_fitz() + with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp: + tmp.write(data) + tmp.flush() + doc = fitz.open(tmp.name) + title = "" + author = "" + try: + meta = doc.metadata or {} + title = meta.get("title", "") or "" + author = meta.get("author", "") or "" + except Exception: + pass + if not title: + title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF" + pages = [] + for page_num in range(len(doc)): + pages.append(doc[page_num].get_text()) + doc.close() + return { + "content_type": "pdf", + "title": title, + "text": "\n\n".join(pages), + "metadata": { + "source_url": url, + "author": author, + "page_count": len(pages), + } + } + + html = resp.read().decode('utf-8', errors='replace') + + doc = Document(html) + title = doc.title() or "" + summary_html = doc.summary() + + # Convert HTML summary to plain text + tree = document_fromstring(summary_html) + text = tree.text_content() + + # Clean up whitespace + text = re.sub(r'\n{3,}', '\n\n', text) + text = text.strip() + + return { + "content_type": "web", + "title": title, + "text": text, + "metadata": { + "source_url": url, + } + } + except Exception as e: + return {"error": f"Web extraction failed: {e}", "content_type": "web"} + + +def extract_content(url: str) -> dict: + """Main extraction dispatcher.""" + content_type = classify_url(url) + + if content_type == 'youtube': + return fetch_youtube(url) + elif content_type == 'pdf': + return fetch_pdf(url) + else: + return fetch_web(url) + + +class ExtractorHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + path = self.path.rstrip("/") + if path == "/healthz": + self._json_response({"status": "ok"}) + else: + self._json_response({"error": "not found", "hint": "POST /extract with {url: ...}"}, status=404) + + def do_POST(self): + path = self.path.rstrip("/") + if path != "/extract": + self._json_response({"error": "not found"}, status=404) + return + + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length) + data = json.loads(body) if body else {} + except Exception as e: + self._json_response({"error": f"Invalid request body: {e}"}, status=400) + return + + url = data.get("url", "").strip() + if not url: + self._json_response({"error": "Missing 'url' field"}, status=400) + return + + if not url.startswith(("http://", "https://")): + self._json_response({"error": "URL must start with http:// or https://"}, status=400) + return + + print(f"Extracting: {url}", flush=True) + try: + result = extract_content(url) + except Exception as e: + result = {"error": f"Internal error: {e}"} + + if "error" in result: + print(f"Error: {result['error']}", flush=True) + self._json_response(result, status=500) + else: + ct = result.get("content_type", "?") + tlen = len(result.get("text", "")) + print(f"Success: {ct}, {tlen} chars", flush=True) + self._json_response(result) + + def _json_response(self, data, status=200): + body = json.dumps(data, indent=2).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format, *args): + pass + + +def main(): + server = http.server.HTTPServer(("0.0.0.0", PORT), ExtractorHandler) + print(f"url-content-extractor listening on 0.0.0.0:{PORT}", flush=True) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/scripts/voice-memo-processor.py b/scripts/voice-memo-processor.py new file mode 100644 index 0000000..33fbf5d --- /dev/null +++ b/scripts/voice-memo-processor.py @@ -0,0 +1,418 @@ +#!/usr/bin/env python3 +""" +Voice Memo Processor Endpoint +Handles voice memo processing with support for: + - Audio URL (existing behavior) + - Telegram voice messages (file_id) + - Kokoro TTS read-back of summaries + +Listens on 0.0.0.0:18813 (configurable via PORT env var). + +Endpoints: + POST /process -> Process voice memo (download + transcribe + summarize + optional TTS) + POST /tts -> Generate TTS audio from text (Kokoro) + GET /audio/ -> Serve generated audio file + GET /healthz -> Health check +""" + +import hashlib +import http.server +import json +import os +import re +import subprocess +import sys +import tempfile +import urllib.request +import urllib.parse +import urllib.error + +PORT = int(os.environ.get("PORT", 18813)) +AUDIO_DIR = os.path.join(tempfile.gettempdir(), "voice-memo-audio") +os.makedirs(AUDIO_DIR, exist_ok=True) + +# Service endpoints (from host perspective) +WHISPER_URL = os.environ.get("WHISPER_URL", "http://127.0.0.1:18811/v1/audio/transcriptions") +LLM_URL = os.environ.get("LLM_URL", "http://127.0.0.1:18806/v1/chat/completions") +KOKORO_URL = os.environ.get("KOKORO_URL", "http://127.0.0.1:18805/v1/audio/speech") + +# Telegram Bot API +TELEGRAM_BOT_TOKEN = "" +_token_paths = [ + os.path.expanduser("~/.hermes/.env"), + os.path.expanduser("~/lab/swarm/.env"), +] +for _p in _token_paths: + if os.path.isfile(_p): + with open(_p) as _f: + for _line in _f: + _line = _line.strip() + if _line.startswith("TELEGRAM_BOT_TOKEN="): + TELEGRAM_BOT_TOKEN = _line.split("=", 1)[1].strip().strip('"').strip("'") + break + if TELEGRAM_BOT_TOKEN: + break + + +def _json_response(handler, data, status=200): + body = json.dumps(data, indent=2).encode() + handler.send_response(status) + handler.send_header("Content-Type", "application/json") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def _file_response(handler, filepath, content_type="audio/mpeg"): + with open(filepath, "rb") as f: + data = f.read() + handler.send_response(200) + handler.send_header("Content-Type", content_type) + handler.send_header("Content-Length", str(len(data))) + handler.end_headers() + handler.wfile.write(data) + + +def download_telegram_voice(file_id: str) -> str: + """Download a Telegram voice file by file_id, return local path.""" + if not TELEGRAM_BOT_TOKEN: + raise ValueError("TELEGRAM_BOT_TOKEN not configured") + + # Get file path + url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile?file_id={file_id}" + resp = urllib.request.urlopen(url, timeout=15) + data = json.loads(resp.read()) + if not data.get("ok"): + raise ValueError(f"Telegram getFile failed: {data}") + + file_path = data["result"]["file_path"] + + # Download the file + download_url = f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}" + resp = urllib.request.urlopen(download_url, timeout=60) + audio_data = resp.read() + + # Save to temp file with appropriate extension + ext = os.path.splitext(file_path)[1] or ".ogg" + tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR) + tmp.write(audio_data) + tmp.close() + return tmp.name + + +def download_audio_url(url: str) -> str: + """Download audio from URL, return local path.""" + ext = ".mp3" + parsed = urllib.parse.urlparse(url) + path_ext = os.path.splitext(parsed.path)[1] + if path_ext in (".ogg", ".oga", ".opus", ".wav", ".m4a", ".webm", ".flac"): + ext = path_ext + + req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) + resp = urllib.request.urlopen(req, timeout=60) + audio_data = resp.read() + + # Check content type for better extension guess + ct = resp.headers.get("Content-Type", "") + if "ogg" in ct: + ext = ".ogg" + elif "webm" in ct: + ext = ".webm" + elif "wav" in ct: + ext = ".wav" + elif "mp4" in ct or "m4a" in ct: + ext = ".m4a" + + tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR) + tmp.write(audio_data) + tmp.close() + return tmp.name + + +def transcribe_whisper(audio_path: str) -> str: + """Transcribe audio file using local Whisper.""" + filename = os.path.basename(audio_path) + + # Build multipart form data + boundary = "----VoiceMemoBoundary" + with open(audio_path, "rb") as f: + file_data = f.read() + + body = ( + f"--{boundary}\r\n" + f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' + f"Content-Type: application/octet-stream\r\n\r\n" + ).encode() + file_data + ( + f"\r\n--{boundary}\r\n" + f'Content-Disposition: form-data; name="model"\r\n\r\n' + f"whisper-1\r\n" + f"--{boundary}--\r\n" + ).encode() + + req = urllib.request.Request( + WHISPER_URL, + data=body, + headers={ + "Content-Type": f"multipart/form-data; boundary={boundary}", + }, + ) + resp = urllib.request.urlopen(req, timeout=120) + result = json.loads(resp.read()) + + transcript = ( + result.get("text", "") + or result.get("transcription", "") + or (", ".join(s.get("text", "") for s in result.get("segments", [])) if "segments" in result else "") + ) + if not transcript: + raise ValueError(f"Whisper returned no text: {json.dumps(result)[:200]}") + return transcript.strip() + + +def summarize_llm(transcript: str, title: str = "Voice Memo") -> str: + """Summarize transcript using local LLM.""" + payload = { + "model": "gemma-4-26b", + "messages": [ + { + "role": "system", + "content": "Convert raw voice memo transcripts into concise useful notes. " + "Return markdown only with Summary, Key Points, Action Items, Open Questions.", + }, + { + "role": "user", + "content": f"Title: {title}\n\nTranscript:\n{transcript[:6000]}", + }, + ], + "temperature": 0.2, + "max_tokens": 900, + } + + req = urllib.request.Request( + LLM_URL, + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + ) + resp = urllib.request.urlopen(req, timeout=120) + result = json.loads(resp.read()) + + return ( + result.get("choices", [{}])[0] + .get("message", {}) + .get("content", "Summary unavailable.") + ) + + +def generate_tts(text: str, voice: str = "af_heart") -> str: + """Generate TTS audio using Kokoro, return path to audio file.""" + payload = { + "model": "kokoro", + "input": text[:4000], # Kokoro has char limits + "voice": voice, + "response_format": "mp3", + "stream": False, + "return_download_link": True, + } + + req = urllib.request.Request( + KOKORO_URL, + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json"}, + ) + resp = urllib.request.urlopen(req, timeout=120) + + # Kokoro returns audio directly or with download link + content_type = resp.headers.get("Content-Type", "") + if "audio" in content_type: + # Direct audio response + audio_data = resp.read() + filename = hashlib.sha256(text.encode()).hexdigest()[:16] + ".mp3" + filepath = os.path.join(AUDIO_DIR, filename) + with open(filepath, "wb") as f: + f.write(audio_data) + return filepath + + # Check for download link in headers + download_path = resp.headers.get("X-Download-Path", "") + if download_path: + return download_path + + # Try JSON response + try: + result = json.loads(resp.read()) + if "download_url" in result: + return result["download_url"] + if "audio_url" in result: + return result["audio_url"] + except Exception: + pass + + raise ValueError("Kokoro TTS returned unexpected response format") + + +class VoiceMemoHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + path = self.path.rstrip("/") + + if path == "/healthz": + _json_response(self, {"status": "ok"}) + return + + # Serve audio files: /audio/ + if path.startswith("/audio/"): + filename = path[len("/audio/"):] + filepath = os.path.join(AUDIO_DIR, filename) + if os.path.isfile(filepath): + _file_response(self, filepath, "audio/mpeg") + return + _json_response(self, {"error": "audio file not found"}, status=404) + return + + _json_response(self, {"error": "not found"}, status=404) + + def do_POST(self): + path = self.path.rstrip("/") + + if path == "/healthz": + _json_response(self, {"status": "ok"}) + return + + try: + content_length = int(self.headers.get("Content-Length", 0)) + body = self.rfile.read(content_length) + data = json.loads(body) if body else {} + except Exception as e: + _json_response(self, {"error": f"Invalid request body: {e}"}, status=400) + return + + if path == "/tts": + self._handle_tts(data) + return + + if path == "/process": + self._handle_process(data) + return + + _json_response(self, {"error": "not found"}, status=404) + + def _handle_tts(self, data): + """Handle TTS-only request.""" + text = data.get("text", "").strip() + if not text: + _json_response(self, {"error": "Missing 'text' field"}, status=400) + return + + voice = data.get("voice", "af_heart") + print(f"TTS: {len(text)} chars, voice={voice}", flush=True) + + try: + audio_path = generate_tts(text, voice) + filename = os.path.basename(audio_path) + audio_url = f"/audio/{filename}" + _json_response(self, { + "audio_path": audio_path, + "audio_url": audio_url, + "filename": filename, + }) + except Exception as e: + print(f"TTS error: {e}", flush=True) + _json_response(self, {"error": f"TTS failed: {e}"}, status=500) + + def _handle_process(self, data): + """Handle full voice memo processing pipeline.""" + # Determine audio source + audio_url = data.get("audio_url", "").strip() + telegram_file_id = data.get("telegram_file_id", "").strip() + discord_audio_url = data.get("discord_audio_url", "").strip() + title = data.get("title", "Voice Memo") + tags = data.get("tags", ["voice", "memo"]) + include_tts = data.get("include_tts", False) + voice = data.get("voice", "af_heart") + + source_type = "url" + local_audio = None + + try: + # Download audio from appropriate source + if telegram_file_id: + print(f"Processing Telegram voice: {telegram_file_id[:20]}...", flush=True) + local_audio = download_telegram_voice(telegram_file_id) + source_type = "telegram" + elif discord_audio_url: + print(f"Processing Discord voice: {discord_audio_url[:50]}...", flush=True) + local_audio = download_audio_url(discord_audio_url) + source_type = "discord" + elif audio_url: + print(f"Processing audio URL: {audio_url[:50]}...", flush=True) + local_audio = download_audio_url(audio_url) + source_type = "url" + else: + _json_response(self, { + "error": "Must provide one of: audio_url, telegram_file_id, discord_audio_url" + }, status=400) + return + + # Transcribe + print(f"Transcribing {os.path.basename(local_audio)}...", flush=True) + transcript = transcribe_whisper(local_audio) + print(f"Transcript: {len(transcript)} chars", flush=True) + + # Summarize + print("Summarizing...", flush=True) + summary = summarize_llm(transcript, title) + print(f"Summary: {len(summary)} chars", flush=True) + + # Optional TTS + tts_url = None + tts_path = None + if include_tts and summary: + try: + print("Generating TTS read-back...", flush=True) + tts_path = generate_tts(summary, voice) + tts_filename = os.path.basename(tts_path) + tts_url = f"/audio/{tts_filename}" + print(f"TTS: {tts_filename}", flush=True) + except Exception as e: + print(f"TTS warning (non-fatal): {e}", flush=True) + + result = { + "source_type": source_type, + "title": title, + "tags": tags, + "transcript": transcript, + "summary": summary, + "created_at": __import__("datetime").datetime.now().isoformat(), + } + if tts_url: + result["tts_audio_url"] = tts_url + result["tts_audio_path"] = tts_path + + _json_response(self, result) + + except Exception as e: + print(f"Error: {e}", flush=True) + _json_response(self, {"error": str(e)}, status=500) + finally: + # Clean up downloaded audio (keep TTS files for serving) + if local_audio and os.path.isfile(local_audio): + try: + os.unlink(local_audio) + except Exception: + pass + + def log_message(self, format, *args): + pass + + +def main(): + server = http.server.HTTPServer(("0.0.0.0", PORT), VoiceMemoHandler) + print(f"voice-memo-processor listening on 0.0.0.0:{PORT}", flush=True) + try: + server.serve_forever() + except KeyboardInterrupt: + pass + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/Webhook Action Bus.md b/swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/Webhook Action Bus.md new file mode 100644 index 0000000..fdfdcef --- /dev/null +++ b/swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/Webhook Action Bus.md @@ -0,0 +1,449 @@ +--- +title: Webhook Action Bus +area: infrastructure +tags: [infrastructure, automation, webhooks, n8n, api] +created: 2026-05-13 +updated: 2026-05-13 +status: active +related: "[[Infrastructure/Automation/n8n Workflows]], [[Infrastructure/Architecture]], [[Infrastructure/Services/Docker Services]]" +--- + +# Webhook Action Bus + +Central catalog of all webhook endpoints in the n8n automation stack. Every webhook-triggered workflow and host-side HTTP endpoint is documented here with its URL, method, authentication, request/response schemas, and implementation status. + +## Architecture + +``` +External Caller + | + v +n8n Webhook (port 18808) + | + +-- /webhook/openclaw-action --> OpenClaw Action Bus (router) + +-- /webhook/openclaw-reminder --> Reminder Webhook + +-- /webhook/web-to-notes --> Web-to-Notes Capture + +-- /webhook/voice-memo --> Voice Memo Capture + | +Host-side Services (from Docker: 172.19.0.1) + +-- :18809/health --> Docker Container Health + +-- :18810/reindex --> Obsidian Vault Reindex + +-- :18810/healthz --> Obsidian Reindex Health + +-- :18810/reindex/status --> Obsidian Reindex Status +``` + +### n8n Webhook URL Structure + +All n8n webhooks follow this pattern: + +``` +http://{host}:18808/webhook/{path} +``` + +In production (from inside Docker), n8n sees itself at `http://localhost:18808/` with `WEBHOOK_URL=http://localhost:18808/`. + +--- + +## Endpoint Catalog + +### 1. OpenClaw Action Bus + +| Field | Value | +|-------|-------| +| **Workflow** | OpenClaw Action Bus | +| **Workflow ID** | `Jwi54VWMdlLqYnRo` | +| **Status** | Active, Implemented | +| **URL** | `POST http://{host}:18808/webhook/openclaw-action` | +| **Authentication** | Header Auth (`OpenClaw Webhook Header` credential) | +| **Auth Header** | `x-openclaw-secret: {secret}` | + +**Purpose:** Central action router. Accepts a JSON body with an `action` field and routes to the appropriate handler. Supports 30+ actions including email, calendar, tasks, drive, docs, notifications, approvals, and URL fetching. + +#### Request Schema + +```json +{ + "action": "string (required) - one of the supported action names", + "args": { + "// action-specific parameters, see below" + }, + "request_id": "string (optional) - client-supplied correlation ID" +} +``` + +#### Supported Actions + +| Action | Args | Description | +|--------|------|-------------| +| `notify` | `{ message: string }` | Send a notification via Telegram | +| `send_notification_draft` | `{ title, message, draft_id? }` | Create & send a notification draft (requires approval) | +| `fetch_and_normalize_url` | `{ url: string, max_chars?: number (500-20000, default 8000) }` | Fetch a URL, strip HTML, return clean text | +| `send_email_draft` | `{ to: string[], cc?: string[], subject: string, body: string }` | Create an email draft for approval | +| `list_email_drafts` | `{ max?: number (1-100, default 20), page?: string }` | List pending email drafts | +| `delete_email_draft` | `{ draft_id: string }` | Delete an email draft | +| `send_gmail_draft` | `{ draft_id: string }` | Send an approved email draft | +| `send_approved_email` | `{ draft_id: string }` | Alias for `send_gmail_draft` | +| `create_calendar_event` | `{ title, start, end, description?, location?, calendar? }` | Create a calendar event | +| `list_upcoming_events` | `{ calendar?: string, max?: number (1-100, default 20), days_ahead?: number }` | List upcoming calendar events | +| `update_calendar_event` | `{ calendar?, event_id, title?, start?, end?, description?, location? }` | Update a calendar event | +| `delete_calendar_event` | `{ calendar?, event_id: string }` | Delete a calendar event | +| `tasks_add` | `{ title: string, notes?: string, due?: string, tasklist_id?: string }` | Add a Google Task | +| `tasks_list` | `{ tasklist_id?: string, max?: number (1-100, default 20) }` | List Google Tasks | +| `tasks_done` | `{ task_id: string, tasklist_id?: string }` | Mark a Google Task as done | +| `tasks_delete` | `{ task_id: string, tasklist_id?: string }` | Delete a Google Task | +| `drive_search` | `{ query: string, max?: number (1-50, default 10) }` | Search Google Drive | +| `drive_upload` | `{ local_path: string, folder_id?: string }` | Upload file to Google Drive | +| `drive_download` | `{ file_id: string, dest_path: string }` | Download file from Google Drive | +| `docs_list` | `{ max?: number (1-50, default 10) }` | List Google Docs | +| `docs_read` | `{ doc_id: string }` | Read a Google Doc | +| `docs_create` | `{ title: string, content?: string }` | Create a Google Doc | +| `docs_write` | `{ doc_id: string, content: string }` | Write to a Google Doc | +| `docs_export` | `{ doc_id: string, format?: string (default 'md') }` | Export a Google Doc | +| `approval_queue_add` | `{ kind?: string, summary: string }` | Add item to approval queue | +| `approval_queue_list` | `{ limit?: number, include_history?: boolean }` | List approval queue | +| `approval_queue_resolve` | `{ id: string, decision: 'approve' or 'reject' }` | Resolve an approval item | +| `approval_history_attach_execution` | `{ id: string, execution: object }` | Attach execution data to approval history | +| `append_log` | `{ text: string }` | Append to the action log | +| `get_logs` | `{ limit?: number (1-50, default 20) }` | Retrieve action log entries | +| `inbound_event_filter` | `{ ...event data }` | Classify an inbound event | + +#### Response Schema + +```json +{ + "ok": true, + "action": "string - the action that was executed", + "// ... action-specific response fields" +} +``` + +Error responses: +```json +{ + "ok": false, + "error": "string - error description", + "statusCode": 400 +} +``` + +--- + +### 2. OpenClaw Reminder Webhook + +| Field | Value | +|-------|-------| +| **Workflow** | OpenClaw Reminder Webhook | +| **Workflow ID** | `RUR1CGn0ikkxbPin` | +| **Status** | Active, Implemented | +| **URL** | `POST http://{host}:18808/webhook/openclaw-reminder` | +| **Authentication** | Header Auth (`OpenClaw Webhook Header` credential) | +| **Auth Header** | `x-openclaw-secret: {secret}` | + +**Purpose:** Accepts a reminder payload and sends it to both Telegram and Discord. + +#### Request Schema + +```json +{ + "title": "string (required) - reminder title", + "dueAt": "string (optional) - due date/time, e.g. '2026-05-14T09:00:00'", + "context": "string (optional) - additional context for the reminder" +} +``` + +#### Response Schema + +```json +{ + "ok": true, + "sentTelegram": true, + "sentDiscord": true +} +``` + +--- + +### 3. Web-to-Notes Capture + +| Field | Value | +|-------|-------| +| **Workflow** | Web-to-Notes Capture (Local LLM + Obsidian) | +| **Workflow ID** | `GSmzuA5dgGgyRg5v` | +| **Status** | Active, Implemented | +| **URL** | `POST http://{host}:18808/webhook/web-to-notes` | +| **Authentication** | None | +| **Webhook ID** | `7958ecbc-c714-41d5-a829-882447ab95f8` | + +**Purpose:** Captures a URL, fetches its content, summarizes it with the local LLM (Gemma), and saves the result as an Obsidian note. Also sends a Telegram notification. + +#### Request Schema + +```json +{ + "url": "string (required) - HTTP(S) URL to capture", + "title": "string (optional) - override title (default: extracted from page)", + "notes": "string (optional) - personal notes/comment about the capture", + "tags": "string[] | string (optional) - comma-separated or array of tags (default: ['web-capture'])" +} +``` + +#### Response Schema + +```json +{ + "ok": true, + "notePath": "string - Obsidian vault path, e.g. 'Notes/2026-05-13 My Page.md'", + "title": "string - the note title", + "source": "string - the original URL" +} +``` + +--- + +### 4. Voice Memo Capture + +| Field | Value | +|-------|-------| +| **Workflow** | Voice Memo Capture (Audio URL + Local Whisper) | +| **Workflow ID** | `El1BHJZ56JlzhrRZ` | +| **Status** | Active, Implemented | +| **URL** | `POST http://{host}:18808/webhook/voice-memo` | +| **Authentication** | None | +| **Webhook ID** | `06796590-13b3-4347-9582-1ac92719c95d` | + +**Purpose:** Downloads an audio file from a URL, transcribes it with the local Whisper service, summarizes with the local LLM, and saves as an Obsidian note. Sends a Telegram notification. + +#### Request Schema + +```json +{ + "audio_url": "string (required) - HTTP(S) URL to the audio file", + "title": "string (optional) - title for the note (default: 'Voice Memo')", + "source": "string (optional) - source attribution (default: the audio_url)", + "tags": "string[] | string (optional) - tags (default: ['voice', 'memo'])" +} +``` + +#### Response Schema + +```json +{ + "ok": true, + "notePath": "string - Obsidian vault path, e.g. 'Voice Memos/2026-05-13-my-memo.md'", + "title": "string - the note title" +} +``` + +--- + +### 5. Docker Container Health + +| Field | Value | +|-------|-------| +| **Status** | Active, Implemented | +| **URL** | `GET http://{host}:18809/health` | +| **Authentication** | None | + +**Purpose:** Returns the health status of all Docker containers in the swarm. + +#### Response Schema + +```json +{ + "containers": [ + { + "name": "string - container name", + "status": "string - e.g. 'running'", + "image": "string - container image" + } + ] +} +``` + +--- + +### 6. Obsidian Vault Reindex + +| Field | Value | +|-------|-------| +| **Status** | Active, Implemented | +| **URL** | `POST http://{host}:18810/reindex` | +| **Authentication** | None | +| **Timeout** | 300s (5 min) | + +**Purpose:** Triggers a full reindex of the Obsidian vault for search. + +#### Response + +Returns the reindex result (status, file count, etc.) + +--- + +### 7. Obsidian Reindex Health Check + +| Field | Value | +|-------|-------| +| **Status** | Active, Implemented | +| **URL** | `GET http://{host}:18810/healthz` | +| **Authentication** | None | + +**Purpose:** Health check for the Obsidian reindex service. + +#### Response Schema + +```json +{ + "status": "ok" +} +``` + +--- + +### 8. Obsidian Reindex Status + +| Field | Value | +|-------|-------| +| **Status** | Active, Implemented | +| **URL** | `GET http://{host}:18810/reindex/status` | +| **Authentication** | None | + +**Purpose:** Returns the current reindex status including file hashes. + +#### Response Schema + +```json +{ + "files": { + "path/to/file.md": "sha256-hash" + } +} +``` + +--- + +## Gap Analysis: Endpoints That Need Implementation + +The following endpoints are defined in the action bus architecture but do NOT yet have dedicated webhook-triggered workflows. Some are partially covered by Action Bus actions. + +### 1. `process_url` - Capture and Summarize a URL + +| Field | Value | +|-------|-------| +| **Status** | COVERED by `Web-to-Notes Capture` endpoint AND Action Bus `fetch_and_normalize_url` | +| **Gap** | No dedicated `/webhook/process-url` endpoint exists, but the functionality is fully available via `/webhook/web-to-notes` | + +**Recommendation:** Rename `web-to-notes` to `process-url` or add an alias. The current `web-to-notes` endpoint already does URL capture + LLM summary + Obsidian save. + +### 2. `summarize_pdf` - Extract and Summarize a PDF + +| Field | Value | +|-------|-------| +| **Status** | NOT IMPLEMENTED | +| **Gap** | No workflow exists to accept a PDF URL, extract text, and summarize it | + +**Required Implementation:** +- New workflow: `POST /webhook/summarize-pdf` +- Request: `{ "pdf_url": "string (required)", "title?": "string", "tags?": "string[]" }` +- Needs a PDF text extraction service (e.g., `pdftotext`, `pymupdf`, or an HTTP microservice) +- Then summarize with local LLM and save to Obsidian + +### 3. `add_reminder` - Add a Reminder/Task + +| Field | Value | +|-------|-------| +| **Status** | PARTIALLY IMPLEMENTED | +| **Gap** | `POST /webhook/openclaw-reminder` sends an immediate notification but does NOT persist the reminder. Action Bus `tasks_add` adds a Google Task but has no webhook-specific endpoint | + +**Current Coverage:** +- `POST /webhook/openclaw-reminder` - immediate Telegram + Discord notification (no persistence) +- Action Bus `tasks_add` - adds to Google Tasks (persistent) +- Action Bus `create_calendar_event` - creates calendar events with reminders + +**Recommendation:** Consider whether a unified `/webhook/add-reminder` endpoint should both persist AND notify. + +### 4. `sync_vault` - Trigger Obsidian Vault Sync/Reindex + +| Field | Value | +|-------|-------| +| **Status** | COVERED by host endpoint `POST :18810/reindex` | +| **Gap** | No n8n webhook exposes this; it's available as a direct host-side HTTP endpoint only. Also covered by the scheduled `Obsidian Vault Reindex` workflow (every 6 hours) | + +**Recommendation:** Either expose via Action Bus as a new action, or document that callers should use `POST http://172.19.0.1:18810/reindex` directly (host-side only, not externally accessible). To make it externally accessible, add a `sync_vault` action to the Action Bus. + +### 5. `run_health_check` - Trigger a Health Check of the Swarm + +| Field | Value | +|-------|-------| +| **Status** | PARTIALLY IMPLEMENTED | +| **Gap** | `GET :18809/health` returns container health but is host-side only. The `Swarm Health Watchdog` workflow (ID: `lDKocSFXBQWQrDd3`) runs on schedule but has no webhook trigger. No unified webhook endpoint for on-demand health checks | + +**Recommendation:** Add a `run_health_check` action to the Action Bus, or add a webhook trigger to the Swarm Health Watchdog workflow. + +### 6. `process_voice_memo` - Process a Voice Memo + +| Field | Value | +|-------|-------| +| **Status** | FULLY IMPLEMENTED as `POST /webhook/voice-memo` | +| **Gap** | None. This endpoint is fully operational | + +--- + +## Implementation Status Summary + +| Endpoint | Path | Method | Implemented | Workflow ID | +|----------|------|--------|-------------|-------------| +| OpenClaw Action Bus | `/webhook/openclaw-action` | POST | Yes | `Jwi54VWMdlLqYnRo` | +| Reminder Notification | `/webhook/openclaw-reminder` | POST | Yes | `RUR1CGn0ikkxbPin` | +| Web-to-Notes Capture | `/webhook/web-to-notes` | POST | Yes | `GSmzuA5dgGgyRg5v` | +| Voice Memo Capture | `/webhook/voice-memo` | POST | Yes | `El1BHJZ56JlzhrRZ` | +| Docker Container Health | `:18809/health` | GET | Yes | (host-side) | +| Obsidian Reindex | `:18810/reindex` | POST | Yes | (host-side) | +| Obsidian Reindex Health | `:18810/healthz` | GET | Yes | (host-side) | +| Obsidian Reindex Status | `:18810/reindex/status` | GET | Yes | (host-side) | +| **Summarize PDF** | `/webhook/summarize-pdf` | POST | **No** | - | +| **Health Check (webhook)** | via Action Bus | POST | **No** | - | +| **Vault Sync (webhook)** | via Action Bus | POST | **No** | - | + +### Action Bus Sub-actions Status + +The OpenClaw Action Bus already implements these actions internally: +- Email: `send_email_draft`, `list_email_drafts`, `delete_email_draft`, `send_gmail_draft`, `send_approved_email` +- Calendar: `create_calendar_event`, `list_upcoming_events`, `update_calendar_event`, `delete_calendar_event` +- Tasks: `tasks_add`, `tasks_list`, `tasks_done`, `tasks_delete` +- Drive: `drive_search`, `drive_upload`, `drive_download` +- Docs: `docs_list`, `docs_read`, `docs_create`, `docs_write`, `docs_export` +- Notifications: `notify`, `send_notification_draft` +- Approvals: `approval_queue_add`, `approval_queue_list`, `approval_queue_resolve`, `approval_history_attach_execution` +- Utility: `fetch_and_normalize_url`, `append_log`, `get_logs`, `inbound_event_filter` + +--- + +## Network Reference + +| From | To | Address | +|------|----|---------| +| External/Host | n8n Webhooks | `http://127.0.0.1:18808/webhook/{path}` | +| n8n (Docker) | Host services | `http://172.19.0.1:{port}/{path}` | +| n8n (Docker) | n8n internal | `http://127.0.0.1:5678/api/v1/` | +| n8n (Docker) | Obsidian REST | `http://172.19.0.1:27123/vault/{path}` | +| n8n (Docker) | Local LLM | `http://172.19.0.1:18806/v1/` | +| n8n (Docker) | Whisper | `http://172.19.0.1:18811/v1/audio/transcriptions` | + +--- + +## Authentication + +Two authentication patterns are used: + +1. **Header Auth** (`x-openclaw-secret`): Used by Action Bus and Reminder webhooks. The secret is stored in n8n credential `OpenClaw Webhook Header` (ID: `6sZd8ciia1fsItDd`) and referenced in `~/lab/swarm/openclaw/credentials/n8n.env` as `N8N_WEBHOOK_SECRET`. + +2. **No Auth**: Used by Web-to-Notes and Voice Memo webhooks. These are open endpoints (consider adding auth if exposed publicly). + +--- + +## Related + +- [[Infrastructure/Automation/n8n Workflows]] - Full workflow documentation +- [[Infrastructure/Architecture]] - Overall system architecture +- [[Infrastructure/Services/Docker Services]] - Docker service registry +- [[Infrastructure/Automation/Cron Jobs]] - Scheduled task documentation diff --git a/swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/n8n Implementation Handoff.md b/swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/n8n Implementation Handoff.md index b831355..e8621b9 100644 --- a/swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/n8n Implementation Handoff.md +++ b/swarm-common/obsidian-vault/will/will-shared-zap/Infrastructure/Automation/n8n Implementation Handoff.md @@ -164,16 +164,16 @@ Last verified on 2026-05-13 (evening): - Status: active - Type: webhook - Current behavior: - - Accepts an audio URL. - - Downloads audio. - - Transcribes with local Whisper on `18811`. - - Summarizes with local llama.cpp. - - Writes transcript/summary/action items to Obsidian. - - Sends a Telegram notification. + - Accepts three ingress modes: `audio_url`, `telegram_file_id`, or `discord_audio_url`. + - Host-side processor on port `18813` (`voice-memo-processor.py`) handles download, Whisper transcription, and local LLM summarization. + - Optional Kokoro TTS read-back of summary (`include_tts: true`). + - Writes transcript/summary to Obsidian with YAML frontmatter including `source_type`. + - Sends Telegram notification with source type and optional TTS audio link. +- Host-side service: `~/lab/swarm/scripts/voice-memo-processor.py` on port `18813`. +- Systemd user service: `voice-memo-processor.service` (enabled). - Remaining improvement: - - Add native Telegram/Discord voice-message ingress instead of requiring an audio URL. - - Add optional Kokoro read-back of summary. - Add durable action-item routing to notes/task queue. + - Test end-to-end with real Telegram voice messages. ### Web-to-Notes Capture @@ -182,14 +182,14 @@ Last verified on 2026-05-13 (evening): - Type: webhook - Current behavior: - Accepts a URL. - - Fetches the page. - - Extracts readable text. + - Host-side content extractor on port `18812` (`url-content-extractor.py`) classifies and extracts content. + - Supports YouTube (transcript via `youtube-transcript-api`), PDF (text via `pymupdf`), and web (readable text via `readability-lxml`). - Summarizes with local llama.cpp. - - Writes markdown to Obsidian. + - Writes markdown to Obsidian with YAML frontmatter including `content_type`, `source_url`, `title`, `date`, and tags. +- Host-side service: `~/lab/swarm/scripts/url-content-extractor.py` on port `18812`. +- Systemd user service: `url-content-extractor.service` (enabled). - Remaining improvement: - - Add YouTube transcript handling. - - Add PDF handling. - - Add claim extraction and source metadata. + - Add claim extraction and source metadata enrichment. - Add optional Atlas/Hermes higher-quality synthesis for important captures. ### OpenClaw Action Bus / Reminder Webhook @@ -328,17 +328,14 @@ Recommended implementation: 6. ~~Fix stale container URLs in IMAP workflow.~~ Done 2026-05-13. 7. ~~Implement Obsidian Semantic Index.~~ Done 2026-05-13: ChromaDB `obsidian` collection, Ollama nomic-embed-text, automated reindex every 6h. -8. Upgrade Web-to-Notes Capture. - - Add PDF and YouTube transcript support. - - Add source metadata and claim extraction. +8. ~~Upgrade Web-to-Notes Capture.~~ Done 2026-05-13: host-side content extractor on :18812, supports YouTube/PDF/web, workflow updated. + - Remaining: claim extraction, Atlas/Hermes synthesis. -9. Upgrade Voice Memo Pipeline. - - Add native Telegram/Discord voice ingestion. - - Add optional Kokoro audio summary. +9. ~~Upgrade Voice Memo Pipeline.~~ Done 2026-05-13: host-side processor on :18813, Telegram/Discord voice ingress, Kokoro TTS read-back. + - Remaining: test with real Telegram voice, action-item routing. -10. Define webhook action bus catalog. - - Document stable endpoints and schemas. - - Add `process_url`, `summarize_pdf`, `add_reminder`, `sync_vault`, `run_health_check`. +10. ~~Define webhook action bus catalog.~~ Done 2026-05-13: catalog at `Infrastructure/Automation/Webhook Action Bus.md`. + - Remaining: implement `summarize_pdf`, `sync_vault`, `run_health_check` webhook wrappers. ## Verification commands @@ -360,6 +357,12 @@ curl -fsS --max-time 3 http://127.0.0.1:18809/health | python3 -m json.tool curl -fsS http://127.0.0.1:18810/healthz curl -fsS http://127.0.0.1:18810/reindex/status | python3 -m json.tool +# URL content extractor (Web-to-Notes) +curl -fsS http://127.0.0.1:18812/healthz + +# Voice memo processor +curl -fsS http://127.0.0.1:18813/healthz + # Verify from inside n8n container docker exec n8n-agent wget -qO- http://172.19.0.1:18809/health docker exec n8n-agent wget -qO- http://172.19.0.1:18810/healthz