feat(swarm): add URL content extractor + voice memo processor + webhook catalog

- url-content-extractor.py on :18812: YouTube/PDF/web content extraction
- voice-memo-processor.py on :18813: Telegram/Discord/URL voice ingress + Kokoro TTS
- Webhook Action Bus catalog in Obsidian vault
- Updated n8n Implementation Handoff: items #8-10 done
This commit is contained in:
William Valentin
2026-05-13 16:13:00 -07:00
parent 6c13a60f57
commit ff28a7c1ad
4 changed files with 1232 additions and 23 deletions
+339
View File
@@ -0,0 +1,339 @@
#!/usr/bin/env python3
"""
URL Content Extractor Endpoint
Lightweight HTTP server that classifies URLs and extracts content.
Supports:
- YouTube videos: extracts transcript via youtube-transcript-api
- PDF files: downloads and extracts text via pymupdf
- Web pages: fetches HTML and extracts readable text via readability-lxml
Listens on 0.0.0.0:18812 (configurable via PORT env var).
Endpoints:
POST /extract -> {url: "..."} -> JSON with content_type, title, text, metadata
GET /healthz -> returns ok
"""
import http.server
import json
import os
import re
import sys
import tempfile
import traceback
import urllib.request
import urllib.parse
import urllib.error
PORT = int(os.environ.get("PORT", 18812))
MAX_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB max download
YOUTUBE_PATTERNS = [
re.compile(r'(?:youtube\.com/watch\?.*v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})'),
re.compile(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})'),
]
PDF_EXTENSIONS = ('.pdf',)
PDF_CONTENT_TYPES = ('application/pdf',)
def _import_youtube():
from youtube_transcript_api import YouTubeTranscriptApi
return YouTubeTranscriptApi
def _import_fitz():
import fitz
return fitz
def _import_readability():
from readability import Document
from lxml.html import document_fromstring
return Document, document_fromstring
def classify_url(url: str) -> str:
"""Classify URL as youtube, pdf, or web."""
parsed = urllib.parse.urlparse(url)
host = (parsed.hostname or '').lower()
path = parsed.path.lower()
# Check YouTube
for pat in YOUTUBE_PATTERNS:
if pat.search(url):
return 'youtube'
# Check PDF by extension
if path.endswith(PDF_EXTENSIONS):
return 'pdf'
# Check known PDF-hosting domains with non-.pdf paths
pdf_host_patterns = [
'arxiv.org/pdf/',
]
for pattern in pdf_host_patterns:
if pattern in url.lower():
return 'pdf'
return 'web'
def extract_youtube_id(url: str) -> str | None:
"""Extract YouTube video ID from URL."""
for pat in YOUTUBE_PATTERNS:
m = pat.search(url)
if m:
return m.group(1)
return None
def fetch_youtube(url: str) -> dict:
"""Extract YouTube video transcript."""
YTTA = _import_youtube()
video_id = extract_youtube_id(url)
if not video_id:
return {"error": "Could not extract YouTube video ID", "content_type": "youtube"}
try:
api = YTTA()
transcript_data = api.fetch(video_id, languages=['en', 'en-US', 'en-GB'])
# Try to get video title from the page
title = video_id
try:
req = urllib.request.Request(
f"https://www.youtube.com/watch?v={video_id}",
headers={"User-Agent": "Mozilla/5.0"}
)
resp = urllib.request.urlopen(req, timeout=15)
html = resp.read().decode('utf-8', errors='replace')
m = re.search(r'<title>(.*?)</title>', html)
if m:
title = m.group(1).replace(' - YouTube', '').strip()
except Exception:
pass
# Build transcript text
parts = []
for entry in transcript_data:
parts.append(entry.text)
text = " ".join(parts)
return {
"content_type": "youtube",
"title": title,
"text": text,
"metadata": {
"video_id": video_id,
"source_url": url,
"transcript_entries": len(transcript_data),
}
}
except Exception as e:
return {"error": f"YouTube transcript extraction failed: {e}", "content_type": "youtube"}
def fetch_pdf(url: str) -> dict:
"""Download PDF and extract text."""
fitz = _import_fitz()
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=60)
data = resp.read(MAX_CONTENT_SIZE + 1)
if len(data) > MAX_CONTENT_SIZE:
return {"error": "PDF too large (>50MB)", "content_type": "pdf"}
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp:
tmp.write(data)
tmp.flush()
doc = fitz.open(tmp.name)
title = ""
author = ""
try:
meta = doc.metadata or {}
title = meta.get("title", "") or ""
author = meta.get("author", "") or ""
except Exception:
pass
if not title:
title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF"
pages = []
for page_num in range(len(doc)):
page = doc[page_num]
pages.append(page.get_text())
doc.close()
text = "\n\n".join(pages)
return {
"content_type": "pdf",
"title": title,
"text": text,
"metadata": {
"source_url": url,
"author": author,
"page_count": len(pages),
}
}
except Exception as e:
return {"error": f"PDF extraction failed: {e}", "content_type": "pdf"}
def fetch_web(url: str) -> dict:
"""Fetch web page and extract readable text."""
Document, document_fromstring = _import_readability()
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=30)
# Check if response is actually a PDF (content-type detection)
content_type = resp.headers.get('Content-Type', '')
if 'application/pdf' in content_type:
# Re-process as PDF
data = resp.read(MAX_CONTENT_SIZE + 1)
if len(data) > MAX_CONTENT_SIZE:
return {"error": "PDF too large (>50MB)", "content_type": "pdf"}
fitz = _import_fitz()
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp:
tmp.write(data)
tmp.flush()
doc = fitz.open(tmp.name)
title = ""
author = ""
try:
meta = doc.metadata or {}
title = meta.get("title", "") or ""
author = meta.get("author", "") or ""
except Exception:
pass
if not title:
title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF"
pages = []
for page_num in range(len(doc)):
pages.append(doc[page_num].get_text())
doc.close()
return {
"content_type": "pdf",
"title": title,
"text": "\n\n".join(pages),
"metadata": {
"source_url": url,
"author": author,
"page_count": len(pages),
}
}
html = resp.read().decode('utf-8', errors='replace')
doc = Document(html)
title = doc.title() or ""
summary_html = doc.summary()
# Convert HTML summary to plain text
tree = document_fromstring(summary_html)
text = tree.text_content()
# Clean up whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = text.strip()
return {
"content_type": "web",
"title": title,
"text": text,
"metadata": {
"source_url": url,
}
}
except Exception as e:
return {"error": f"Web extraction failed: {e}", "content_type": "web"}
def extract_content(url: str) -> dict:
"""Main extraction dispatcher."""
content_type = classify_url(url)
if content_type == 'youtube':
return fetch_youtube(url)
elif content_type == 'pdf':
return fetch_pdf(url)
else:
return fetch_web(url)
class ExtractorHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.rstrip("/")
if path == "/healthz":
self._json_response({"status": "ok"})
else:
self._json_response({"error": "not found", "hint": "POST /extract with {url: ...}"}, status=404)
def do_POST(self):
path = self.path.rstrip("/")
if path != "/extract":
self._json_response({"error": "not found"}, status=404)
return
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
data = json.loads(body) if body else {}
except Exception as e:
self._json_response({"error": f"Invalid request body: {e}"}, status=400)
return
url = data.get("url", "").strip()
if not url:
self._json_response({"error": "Missing 'url' field"}, status=400)
return
if not url.startswith(("http://", "https://")):
self._json_response({"error": "URL must start with http:// or https://"}, status=400)
return
print(f"Extracting: {url}", flush=True)
try:
result = extract_content(url)
except Exception as e:
result = {"error": f"Internal error: {e}"}
if "error" in result:
print(f"Error: {result['error']}", flush=True)
self._json_response(result, status=500)
else:
ct = result.get("content_type", "?")
tlen = len(result.get("text", ""))
print(f"Success: {ct}, {tlen} chars", flush=True)
self._json_response(result)
def _json_response(self, data, status=200):
body = json.dumps(data, indent=2).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass
def main():
server = http.server.HTTPServer(("0.0.0.0", PORT), ExtractorHandler)
print(f"url-content-extractor listening on 0.0.0.0:{PORT}", flush=True)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()
if __name__ == "__main__":
main()
+418
View File
@@ -0,0 +1,418 @@
#!/usr/bin/env python3
"""
Voice Memo Processor Endpoint
Handles voice memo processing with support for:
- Audio URL (existing behavior)
- Telegram voice messages (file_id)
- Kokoro TTS read-back of summaries
Listens on 0.0.0.0:18813 (configurable via PORT env var).
Endpoints:
POST /process -> Process voice memo (download + transcribe + summarize + optional TTS)
POST /tts -> Generate TTS audio from text (Kokoro)
GET /audio/<fn> -> Serve generated audio file
GET /healthz -> Health check
"""
import hashlib
import http.server
import json
import os
import re
import subprocess
import sys
import tempfile
import urllib.request
import urllib.parse
import urllib.error
PORT = int(os.environ.get("PORT", 18813))
AUDIO_DIR = os.path.join(tempfile.gettempdir(), "voice-memo-audio")
os.makedirs(AUDIO_DIR, exist_ok=True)
# Service endpoints (from host perspective)
WHISPER_URL = os.environ.get("WHISPER_URL", "http://127.0.0.1:18811/v1/audio/transcriptions")
LLM_URL = os.environ.get("LLM_URL", "http://127.0.0.1:18806/v1/chat/completions")
KOKORO_URL = os.environ.get("KOKORO_URL", "http://127.0.0.1:18805/v1/audio/speech")
# Telegram Bot API
TELEGRAM_BOT_TOKEN = ""
_token_paths = [
os.path.expanduser("~/.hermes/.env"),
os.path.expanduser("~/lab/swarm/.env"),
]
for _p in _token_paths:
if os.path.isfile(_p):
with open(_p) as _f:
for _line in _f:
_line = _line.strip()
if _line.startswith("TELEGRAM_BOT_TOKEN="):
TELEGRAM_BOT_TOKEN = _line.split("=", 1)[1].strip().strip('"').strip("'")
break
if TELEGRAM_BOT_TOKEN:
break
def _json_response(handler, data, status=200):
body = json.dumps(data, indent=2).encode()
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
def _file_response(handler, filepath, content_type="audio/mpeg"):
with open(filepath, "rb") as f:
data = f.read()
handler.send_response(200)
handler.send_header("Content-Type", content_type)
handler.send_header("Content-Length", str(len(data)))
handler.end_headers()
handler.wfile.write(data)
def download_telegram_voice(file_id: str) -> str:
"""Download a Telegram voice file by file_id, return local path."""
if not TELEGRAM_BOT_TOKEN:
raise ValueError("TELEGRAM_BOT_TOKEN not configured")
# Get file path
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/getFile?file_id={file_id}"
resp = urllib.request.urlopen(url, timeout=15)
data = json.loads(resp.read())
if not data.get("ok"):
raise ValueError(f"Telegram getFile failed: {data}")
file_path = data["result"]["file_path"]
# Download the file
download_url = f"https://api.telegram.org/file/bot{TELEGRAM_BOT_TOKEN}/{file_path}"
resp = urllib.request.urlopen(download_url, timeout=60)
audio_data = resp.read()
# Save to temp file with appropriate extension
ext = os.path.splitext(file_path)[1] or ".ogg"
tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR)
tmp.write(audio_data)
tmp.close()
return tmp.name
def download_audio_url(url: str) -> str:
"""Download audio from URL, return local path."""
ext = ".mp3"
parsed = urllib.parse.urlparse(url)
path_ext = os.path.splitext(parsed.path)[1]
if path_ext in (".ogg", ".oga", ".opus", ".wav", ".m4a", ".webm", ".flac"):
ext = path_ext
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=60)
audio_data = resp.read()
# Check content type for better extension guess
ct = resp.headers.get("Content-Type", "")
if "ogg" in ct:
ext = ".ogg"
elif "webm" in ct:
ext = ".webm"
elif "wav" in ct:
ext = ".wav"
elif "mp4" in ct or "m4a" in ct:
ext = ".m4a"
tmp = tempfile.NamedTemporaryFile(suffix=ext, delete=False, dir=AUDIO_DIR)
tmp.write(audio_data)
tmp.close()
return tmp.name
def transcribe_whisper(audio_path: str) -> str:
"""Transcribe audio file using local Whisper."""
filename = os.path.basename(audio_path)
# Build multipart form data
boundary = "----VoiceMemoBoundary"
with open(audio_path, "rb") as f:
file_data = f.read()
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: application/octet-stream\r\n\r\n"
).encode() + file_data + (
f"\r\n--{boundary}\r\n"
f'Content-Disposition: form-data; name="model"\r\n\r\n'
f"whisper-1\r\n"
f"--{boundary}--\r\n"
).encode()
req = urllib.request.Request(
WHISPER_URL,
data=body,
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
},
)
resp = urllib.request.urlopen(req, timeout=120)
result = json.loads(resp.read())
transcript = (
result.get("text", "")
or result.get("transcription", "")
or (", ".join(s.get("text", "") for s in result.get("segments", [])) if "segments" in result else "")
)
if not transcript:
raise ValueError(f"Whisper returned no text: {json.dumps(result)[:200]}")
return transcript.strip()
def summarize_llm(transcript: str, title: str = "Voice Memo") -> str:
"""Summarize transcript using local LLM."""
payload = {
"model": "gemma-4-26b",
"messages": [
{
"role": "system",
"content": "Convert raw voice memo transcripts into concise useful notes. "
"Return markdown only with Summary, Key Points, Action Items, Open Questions.",
},
{
"role": "user",
"content": f"Title: {title}\n\nTranscript:\n{transcript[:6000]}",
},
],
"temperature": 0.2,
"max_tokens": 900,
}
req = urllib.request.Request(
LLM_URL,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req, timeout=120)
result = json.loads(resp.read())
return (
result.get("choices", [{}])[0]
.get("message", {})
.get("content", "Summary unavailable.")
)
def generate_tts(text: str, voice: str = "af_heart") -> str:
"""Generate TTS audio using Kokoro, return path to audio file."""
payload = {
"model": "kokoro",
"input": text[:4000], # Kokoro has char limits
"voice": voice,
"response_format": "mp3",
"stream": False,
"return_download_link": True,
}
req = urllib.request.Request(
KOKORO_URL,
data=json.dumps(payload).encode(),
headers={"Content-Type": "application/json"},
)
resp = urllib.request.urlopen(req, timeout=120)
# Kokoro returns audio directly or with download link
content_type = resp.headers.get("Content-Type", "")
if "audio" in content_type:
# Direct audio response
audio_data = resp.read()
filename = hashlib.sha256(text.encode()).hexdigest()[:16] + ".mp3"
filepath = os.path.join(AUDIO_DIR, filename)
with open(filepath, "wb") as f:
f.write(audio_data)
return filepath
# Check for download link in headers
download_path = resp.headers.get("X-Download-Path", "")
if download_path:
return download_path
# Try JSON response
try:
result = json.loads(resp.read())
if "download_url" in result:
return result["download_url"]
if "audio_url" in result:
return result["audio_url"]
except Exception:
pass
raise ValueError("Kokoro TTS returned unexpected response format")
class VoiceMemoHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.rstrip("/")
if path == "/healthz":
_json_response(self, {"status": "ok"})
return
# Serve audio files: /audio/<filename>
if path.startswith("/audio/"):
filename = path[len("/audio/"):]
filepath = os.path.join(AUDIO_DIR, filename)
if os.path.isfile(filepath):
_file_response(self, filepath, "audio/mpeg")
return
_json_response(self, {"error": "audio file not found"}, status=404)
return
_json_response(self, {"error": "not found"}, status=404)
def do_POST(self):
path = self.path.rstrip("/")
if path == "/healthz":
_json_response(self, {"status": "ok"})
return
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
data = json.loads(body) if body else {}
except Exception as e:
_json_response(self, {"error": f"Invalid request body: {e}"}, status=400)
return
if path == "/tts":
self._handle_tts(data)
return
if path == "/process":
self._handle_process(data)
return
_json_response(self, {"error": "not found"}, status=404)
def _handle_tts(self, data):
"""Handle TTS-only request."""
text = data.get("text", "").strip()
if not text:
_json_response(self, {"error": "Missing 'text' field"}, status=400)
return
voice = data.get("voice", "af_heart")
print(f"TTS: {len(text)} chars, voice={voice}", flush=True)
try:
audio_path = generate_tts(text, voice)
filename = os.path.basename(audio_path)
audio_url = f"/audio/{filename}"
_json_response(self, {
"audio_path": audio_path,
"audio_url": audio_url,
"filename": filename,
})
except Exception as e:
print(f"TTS error: {e}", flush=True)
_json_response(self, {"error": f"TTS failed: {e}"}, status=500)
def _handle_process(self, data):
"""Handle full voice memo processing pipeline."""
# Determine audio source
audio_url = data.get("audio_url", "").strip()
telegram_file_id = data.get("telegram_file_id", "").strip()
discord_audio_url = data.get("discord_audio_url", "").strip()
title = data.get("title", "Voice Memo")
tags = data.get("tags", ["voice", "memo"])
include_tts = data.get("include_tts", False)
voice = data.get("voice", "af_heart")
source_type = "url"
local_audio = None
try:
# Download audio from appropriate source
if telegram_file_id:
print(f"Processing Telegram voice: {telegram_file_id[:20]}...", flush=True)
local_audio = download_telegram_voice(telegram_file_id)
source_type = "telegram"
elif discord_audio_url:
print(f"Processing Discord voice: {discord_audio_url[:50]}...", flush=True)
local_audio = download_audio_url(discord_audio_url)
source_type = "discord"
elif audio_url:
print(f"Processing audio URL: {audio_url[:50]}...", flush=True)
local_audio = download_audio_url(audio_url)
source_type = "url"
else:
_json_response(self, {
"error": "Must provide one of: audio_url, telegram_file_id, discord_audio_url"
}, status=400)
return
# Transcribe
print(f"Transcribing {os.path.basename(local_audio)}...", flush=True)
transcript = transcribe_whisper(local_audio)
print(f"Transcript: {len(transcript)} chars", flush=True)
# Summarize
print("Summarizing...", flush=True)
summary = summarize_llm(transcript, title)
print(f"Summary: {len(summary)} chars", flush=True)
# Optional TTS
tts_url = None
tts_path = None
if include_tts and summary:
try:
print("Generating TTS read-back...", flush=True)
tts_path = generate_tts(summary, voice)
tts_filename = os.path.basename(tts_path)
tts_url = f"/audio/{tts_filename}"
print(f"TTS: {tts_filename}", flush=True)
except Exception as e:
print(f"TTS warning (non-fatal): {e}", flush=True)
result = {
"source_type": source_type,
"title": title,
"tags": tags,
"transcript": transcript,
"summary": summary,
"created_at": __import__("datetime").datetime.now().isoformat(),
}
if tts_url:
result["tts_audio_url"] = tts_url
result["tts_audio_path"] = tts_path
_json_response(self, result)
except Exception as e:
print(f"Error: {e}", flush=True)
_json_response(self, {"error": str(e)}, status=500)
finally:
# Clean up downloaded audio (keep TTS files for serving)
if local_audio and os.path.isfile(local_audio):
try:
os.unlink(local_audio)
except Exception:
pass
def log_message(self, format, *args):
pass
def main():
server = http.server.HTTPServer(("0.0.0.0", PORT), VoiceMemoHandler)
print(f"voice-memo-processor listening on 0.0.0.0:{PORT}", flush=True)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()
if __name__ == "__main__":
main()
@@ -0,0 +1,449 @@
---
title: Webhook Action Bus
area: infrastructure
tags: [infrastructure, automation, webhooks, n8n, api]
created: 2026-05-13
updated: 2026-05-13
status: active
related: "[[Infrastructure/Automation/n8n Workflows]], [[Infrastructure/Architecture]], [[Infrastructure/Services/Docker Services]]"
---
# Webhook Action Bus
Central catalog of all webhook endpoints in the n8n automation stack. Every webhook-triggered workflow and host-side HTTP endpoint is documented here with its URL, method, authentication, request/response schemas, and implementation status.
## Architecture
```
External Caller
|
v
n8n Webhook (port 18808)
|
+-- /webhook/openclaw-action --> OpenClaw Action Bus (router)
+-- /webhook/openclaw-reminder --> Reminder Webhook
+-- /webhook/web-to-notes --> Web-to-Notes Capture
+-- /webhook/voice-memo --> Voice Memo Capture
|
Host-side Services (from Docker: 172.19.0.1)
+-- :18809/health --> Docker Container Health
+-- :18810/reindex --> Obsidian Vault Reindex
+-- :18810/healthz --> Obsidian Reindex Health
+-- :18810/reindex/status --> Obsidian Reindex Status
```
### n8n Webhook URL Structure
All n8n webhooks follow this pattern:
```
http://{host}:18808/webhook/{path}
```
In production (from inside Docker), n8n sees itself at `http://localhost:18808/` with `WEBHOOK_URL=http://localhost:18808/`.
---
## Endpoint Catalog
### 1. OpenClaw Action Bus
| Field | Value |
|-------|-------|
| **Workflow** | OpenClaw Action Bus |
| **Workflow ID** | `Jwi54VWMdlLqYnRo` |
| **Status** | Active, Implemented |
| **URL** | `POST http://{host}:18808/webhook/openclaw-action` |
| **Authentication** | Header Auth (`OpenClaw Webhook Header` credential) |
| **Auth Header** | `x-openclaw-secret: {secret}` |
**Purpose:** Central action router. Accepts a JSON body with an `action` field and routes to the appropriate handler. Supports 30+ actions including email, calendar, tasks, drive, docs, notifications, approvals, and URL fetching.
#### Request Schema
```json
{
"action": "string (required) - one of the supported action names",
"args": {
"// action-specific parameters, see below"
},
"request_id": "string (optional) - client-supplied correlation ID"
}
```
#### Supported Actions
| Action | Args | Description |
|--------|------|-------------|
| `notify` | `{ message: string }` | Send a notification via Telegram |
| `send_notification_draft` | `{ title, message, draft_id? }` | Create & send a notification draft (requires approval) |
| `fetch_and_normalize_url` | `{ url: string, max_chars?: number (500-20000, default 8000) }` | Fetch a URL, strip HTML, return clean text |
| `send_email_draft` | `{ to: string[], cc?: string[], subject: string, body: string }` | Create an email draft for approval |
| `list_email_drafts` | `{ max?: number (1-100, default 20), page?: string }` | List pending email drafts |
| `delete_email_draft` | `{ draft_id: string }` | Delete an email draft |
| `send_gmail_draft` | `{ draft_id: string }` | Send an approved email draft |
| `send_approved_email` | `{ draft_id: string }` | Alias for `send_gmail_draft` |
| `create_calendar_event` | `{ title, start, end, description?, location?, calendar? }` | Create a calendar event |
| `list_upcoming_events` | `{ calendar?: string, max?: number (1-100, default 20), days_ahead?: number }` | List upcoming calendar events |
| `update_calendar_event` | `{ calendar?, event_id, title?, start?, end?, description?, location? }` | Update a calendar event |
| `delete_calendar_event` | `{ calendar?, event_id: string }` | Delete a calendar event |
| `tasks_add` | `{ title: string, notes?: string, due?: string, tasklist_id?: string }` | Add a Google Task |
| `tasks_list` | `{ tasklist_id?: string, max?: number (1-100, default 20) }` | List Google Tasks |
| `tasks_done` | `{ task_id: string, tasklist_id?: string }` | Mark a Google Task as done |
| `tasks_delete` | `{ task_id: string, tasklist_id?: string }` | Delete a Google Task |
| `drive_search` | `{ query: string, max?: number (1-50, default 10) }` | Search Google Drive |
| `drive_upload` | `{ local_path: string, folder_id?: string }` | Upload file to Google Drive |
| `drive_download` | `{ file_id: string, dest_path: string }` | Download file from Google Drive |
| `docs_list` | `{ max?: number (1-50, default 10) }` | List Google Docs |
| `docs_read` | `{ doc_id: string }` | Read a Google Doc |
| `docs_create` | `{ title: string, content?: string }` | Create a Google Doc |
| `docs_write` | `{ doc_id: string, content: string }` | Write to a Google Doc |
| `docs_export` | `{ doc_id: string, format?: string (default 'md') }` | Export a Google Doc |
| `approval_queue_add` | `{ kind?: string, summary: string }` | Add item to approval queue |
| `approval_queue_list` | `{ limit?: number, include_history?: boolean }` | List approval queue |
| `approval_queue_resolve` | `{ id: string, decision: 'approve' or 'reject' }` | Resolve an approval item |
| `approval_history_attach_execution` | `{ id: string, execution: object }` | Attach execution data to approval history |
| `append_log` | `{ text: string }` | Append to the action log |
| `get_logs` | `{ limit?: number (1-50, default 20) }` | Retrieve action log entries |
| `inbound_event_filter` | `{ ...event data }` | Classify an inbound event |
#### Response Schema
```json
{
"ok": true,
"action": "string - the action that was executed",
"// ... action-specific response fields"
}
```
Error responses:
```json
{
"ok": false,
"error": "string - error description",
"statusCode": 400
}
```
---
### 2. OpenClaw Reminder Webhook
| Field | Value |
|-------|-------|
| **Workflow** | OpenClaw Reminder Webhook |
| **Workflow ID** | `RUR1CGn0ikkxbPin` |
| **Status** | Active, Implemented |
| **URL** | `POST http://{host}:18808/webhook/openclaw-reminder` |
| **Authentication** | Header Auth (`OpenClaw Webhook Header` credential) |
| **Auth Header** | `x-openclaw-secret: {secret}` |
**Purpose:** Accepts a reminder payload and sends it to both Telegram and Discord.
#### Request Schema
```json
{
"title": "string (required) - reminder title",
"dueAt": "string (optional) - due date/time, e.g. '2026-05-14T09:00:00'",
"context": "string (optional) - additional context for the reminder"
}
```
#### Response Schema
```json
{
"ok": true,
"sentTelegram": true,
"sentDiscord": true
}
```
---
### 3. Web-to-Notes Capture
| Field | Value |
|-------|-------|
| **Workflow** | Web-to-Notes Capture (Local LLM + Obsidian) |
| **Workflow ID** | `GSmzuA5dgGgyRg5v` |
| **Status** | Active, Implemented |
| **URL** | `POST http://{host}:18808/webhook/web-to-notes` |
| **Authentication** | None |
| **Webhook ID** | `7958ecbc-c714-41d5-a829-882447ab95f8` |
**Purpose:** Captures a URL, fetches its content, summarizes it with the local LLM (Gemma), and saves the result as an Obsidian note. Also sends a Telegram notification.
#### Request Schema
```json
{
"url": "string (required) - HTTP(S) URL to capture",
"title": "string (optional) - override title (default: extracted from page)",
"notes": "string (optional) - personal notes/comment about the capture",
"tags": "string[] | string (optional) - comma-separated or array of tags (default: ['web-capture'])"
}
```
#### Response Schema
```json
{
"ok": true,
"notePath": "string - Obsidian vault path, e.g. 'Notes/2026-05-13 My Page.md'",
"title": "string - the note title",
"source": "string - the original URL"
}
```
---
### 4. Voice Memo Capture
| Field | Value |
|-------|-------|
| **Workflow** | Voice Memo Capture (Audio URL + Local Whisper) |
| **Workflow ID** | `El1BHJZ56JlzhrRZ` |
| **Status** | Active, Implemented |
| **URL** | `POST http://{host}:18808/webhook/voice-memo` |
| **Authentication** | None |
| **Webhook ID** | `06796590-13b3-4347-9582-1ac92719c95d` |
**Purpose:** Downloads an audio file from a URL, transcribes it with the local Whisper service, summarizes with the local LLM, and saves as an Obsidian note. Sends a Telegram notification.
#### Request Schema
```json
{
"audio_url": "string (required) - HTTP(S) URL to the audio file",
"title": "string (optional) - title for the note (default: 'Voice Memo')",
"source": "string (optional) - source attribution (default: the audio_url)",
"tags": "string[] | string (optional) - tags (default: ['voice', 'memo'])"
}
```
#### Response Schema
```json
{
"ok": true,
"notePath": "string - Obsidian vault path, e.g. 'Voice Memos/2026-05-13-my-memo.md'",
"title": "string - the note title"
}
```
---
### 5. Docker Container Health
| Field | Value |
|-------|-------|
| **Status** | Active, Implemented |
| **URL** | `GET http://{host}:18809/health` |
| **Authentication** | None |
**Purpose:** Returns the health status of all Docker containers in the swarm.
#### Response Schema
```json
{
"containers": [
{
"name": "string - container name",
"status": "string - e.g. 'running'",
"image": "string - container image"
}
]
}
```
---
### 6. Obsidian Vault Reindex
| Field | Value |
|-------|-------|
| **Status** | Active, Implemented |
| **URL** | `POST http://{host}:18810/reindex` |
| **Authentication** | None |
| **Timeout** | 300s (5 min) |
**Purpose:** Triggers a full reindex of the Obsidian vault for search.
#### Response
Returns the reindex result (status, file count, etc.)
---
### 7. Obsidian Reindex Health Check
| Field | Value |
|-------|-------|
| **Status** | Active, Implemented |
| **URL** | `GET http://{host}:18810/healthz` |
| **Authentication** | None |
**Purpose:** Health check for the Obsidian reindex service.
#### Response Schema
```json
{
"status": "ok"
}
```
---
### 8. Obsidian Reindex Status
| Field | Value |
|-------|-------|
| **Status** | Active, Implemented |
| **URL** | `GET http://{host}:18810/reindex/status` |
| **Authentication** | None |
**Purpose:** Returns the current reindex status including file hashes.
#### Response Schema
```json
{
"files": {
"path/to/file.md": "sha256-hash"
}
}
```
---
## Gap Analysis: Endpoints That Need Implementation
The following endpoints are defined in the action bus architecture but do NOT yet have dedicated webhook-triggered workflows. Some are partially covered by Action Bus actions.
### 1. `process_url` - Capture and Summarize a URL
| Field | Value |
|-------|-------|
| **Status** | COVERED by `Web-to-Notes Capture` endpoint AND Action Bus `fetch_and_normalize_url` |
| **Gap** | No dedicated `/webhook/process-url` endpoint exists, but the functionality is fully available via `/webhook/web-to-notes` |
**Recommendation:** Rename `web-to-notes` to `process-url` or add an alias. The current `web-to-notes` endpoint already does URL capture + LLM summary + Obsidian save.
### 2. `summarize_pdf` - Extract and Summarize a PDF
| Field | Value |
|-------|-------|
| **Status** | NOT IMPLEMENTED |
| **Gap** | No workflow exists to accept a PDF URL, extract text, and summarize it |
**Required Implementation:**
- New workflow: `POST /webhook/summarize-pdf`
- Request: `{ "pdf_url": "string (required)", "title?": "string", "tags?": "string[]" }`
- Needs a PDF text extraction service (e.g., `pdftotext`, `pymupdf`, or an HTTP microservice)
- Then summarize with local LLM and save to Obsidian
### 3. `add_reminder` - Add a Reminder/Task
| Field | Value |
|-------|-------|
| **Status** | PARTIALLY IMPLEMENTED |
| **Gap** | `POST /webhook/openclaw-reminder` sends an immediate notification but does NOT persist the reminder. Action Bus `tasks_add` adds a Google Task but has no webhook-specific endpoint |
**Current Coverage:**
- `POST /webhook/openclaw-reminder` - immediate Telegram + Discord notification (no persistence)
- Action Bus `tasks_add` - adds to Google Tasks (persistent)
- Action Bus `create_calendar_event` - creates calendar events with reminders
**Recommendation:** Consider whether a unified `/webhook/add-reminder` endpoint should both persist AND notify.
### 4. `sync_vault` - Trigger Obsidian Vault Sync/Reindex
| Field | Value |
|-------|-------|
| **Status** | COVERED by host endpoint `POST :18810/reindex` |
| **Gap** | No n8n webhook exposes this; it's available as a direct host-side HTTP endpoint only. Also covered by the scheduled `Obsidian Vault Reindex` workflow (every 6 hours) |
**Recommendation:** Either expose via Action Bus as a new action, or document that callers should use `POST http://172.19.0.1:18810/reindex` directly (host-side only, not externally accessible). To make it externally accessible, add a `sync_vault` action to the Action Bus.
### 5. `run_health_check` - Trigger a Health Check of the Swarm
| Field | Value |
|-------|-------|
| **Status** | PARTIALLY IMPLEMENTED |
| **Gap** | `GET :18809/health` returns container health but is host-side only. The `Swarm Health Watchdog` workflow (ID: `lDKocSFXBQWQrDd3`) runs on schedule but has no webhook trigger. No unified webhook endpoint for on-demand health checks |
**Recommendation:** Add a `run_health_check` action to the Action Bus, or add a webhook trigger to the Swarm Health Watchdog workflow.
### 6. `process_voice_memo` - Process a Voice Memo
| Field | Value |
|-------|-------|
| **Status** | FULLY IMPLEMENTED as `POST /webhook/voice-memo` |
| **Gap** | None. This endpoint is fully operational |
---
## Implementation Status Summary
| Endpoint | Path | Method | Implemented | Workflow ID |
|----------|------|--------|-------------|-------------|
| OpenClaw Action Bus | `/webhook/openclaw-action` | POST | Yes | `Jwi54VWMdlLqYnRo` |
| Reminder Notification | `/webhook/openclaw-reminder` | POST | Yes | `RUR1CGn0ikkxbPin` |
| Web-to-Notes Capture | `/webhook/web-to-notes` | POST | Yes | `GSmzuA5dgGgyRg5v` |
| Voice Memo Capture | `/webhook/voice-memo` | POST | Yes | `El1BHJZ56JlzhrRZ` |
| Docker Container Health | `:18809/health` | GET | Yes | (host-side) |
| Obsidian Reindex | `:18810/reindex` | POST | Yes | (host-side) |
| Obsidian Reindex Health | `:18810/healthz` | GET | Yes | (host-side) |
| Obsidian Reindex Status | `:18810/reindex/status` | GET | Yes | (host-side) |
| **Summarize PDF** | `/webhook/summarize-pdf` | POST | **No** | - |
| **Health Check (webhook)** | via Action Bus | POST | **No** | - |
| **Vault Sync (webhook)** | via Action Bus | POST | **No** | - |
### Action Bus Sub-actions Status
The OpenClaw Action Bus already implements these actions internally:
- Email: `send_email_draft`, `list_email_drafts`, `delete_email_draft`, `send_gmail_draft`, `send_approved_email`
- Calendar: `create_calendar_event`, `list_upcoming_events`, `update_calendar_event`, `delete_calendar_event`
- Tasks: `tasks_add`, `tasks_list`, `tasks_done`, `tasks_delete`
- Drive: `drive_search`, `drive_upload`, `drive_download`
- Docs: `docs_list`, `docs_read`, `docs_create`, `docs_write`, `docs_export`
- Notifications: `notify`, `send_notification_draft`
- Approvals: `approval_queue_add`, `approval_queue_list`, `approval_queue_resolve`, `approval_history_attach_execution`
- Utility: `fetch_and_normalize_url`, `append_log`, `get_logs`, `inbound_event_filter`
---
## Network Reference
| From | To | Address |
|------|----|---------|
| External/Host | n8n Webhooks | `http://127.0.0.1:18808/webhook/{path}` |
| n8n (Docker) | Host services | `http://172.19.0.1:{port}/{path}` |
| n8n (Docker) | n8n internal | `http://127.0.0.1:5678/api/v1/` |
| n8n (Docker) | Obsidian REST | `http://172.19.0.1:27123/vault/{path}` |
| n8n (Docker) | Local LLM | `http://172.19.0.1:18806/v1/` |
| n8n (Docker) | Whisper | `http://172.19.0.1:18811/v1/audio/transcriptions` |
---
## Authentication
Two authentication patterns are used:
1. **Header Auth** (`x-openclaw-secret`): Used by Action Bus and Reminder webhooks. The secret is stored in n8n credential `OpenClaw Webhook Header` (ID: `6sZd8ciia1fsItDd`) and referenced in `~/lab/swarm/openclaw/credentials/n8n.env` as `N8N_WEBHOOK_SECRET`.
2. **No Auth**: Used by Web-to-Notes and Voice Memo webhooks. These are open endpoints (consider adding auth if exposed publicly).
---
## Related
- [[Infrastructure/Automation/n8n Workflows]] - Full workflow documentation
- [[Infrastructure/Architecture]] - Overall system architecture
- [[Infrastructure/Services/Docker Services]] - Docker service registry
- [[Infrastructure/Automation/Cron Jobs]] - Scheduled task documentation
@@ -164,16 +164,16 @@ Last verified on 2026-05-13 (evening):
- Status: active
- Type: webhook
- Current behavior:
- Accepts an audio URL.
- Downloads audio.
- Transcribes with local Whisper on `18811`.
- Summarizes with local llama.cpp.
- Writes transcript/summary/action items to Obsidian.
- Sends a Telegram notification.
- Accepts three ingress modes: `audio_url`, `telegram_file_id`, or `discord_audio_url`.
- Host-side processor on port `18813` (`voice-memo-processor.py`) handles download, Whisper transcription, and local LLM summarization.
- Optional Kokoro TTS read-back of summary (`include_tts: true`).
- Writes transcript/summary to Obsidian with YAML frontmatter including `source_type`.
- Sends Telegram notification with source type and optional TTS audio link.
- Host-side service: `~/lab/swarm/scripts/voice-memo-processor.py` on port `18813`.
- Systemd user service: `voice-memo-processor.service` (enabled).
- Remaining improvement:
- Add native Telegram/Discord voice-message ingress instead of requiring an audio URL.
- Add optional Kokoro read-back of summary.
- Add durable action-item routing to notes/task queue.
- Test end-to-end with real Telegram voice messages.
### Web-to-Notes Capture
@@ -182,14 +182,14 @@ Last verified on 2026-05-13 (evening):
- Type: webhook
- Current behavior:
- Accepts a URL.
- Fetches the page.
- Extracts readable text.
- Host-side content extractor on port `18812` (`url-content-extractor.py`) classifies and extracts content.
- Supports YouTube (transcript via `youtube-transcript-api`), PDF (text via `pymupdf`), and web (readable text via `readability-lxml`).
- Summarizes with local llama.cpp.
- Writes markdown to Obsidian.
- Writes markdown to Obsidian with YAML frontmatter including `content_type`, `source_url`, `title`, `date`, and tags.
- Host-side service: `~/lab/swarm/scripts/url-content-extractor.py` on port `18812`.
- Systemd user service: `url-content-extractor.service` (enabled).
- Remaining improvement:
- Add YouTube transcript handling.
- Add PDF handling.
- Add claim extraction and source metadata.
- Add claim extraction and source metadata enrichment.
- Add optional Atlas/Hermes higher-quality synthesis for important captures.
### OpenClaw Action Bus / Reminder Webhook
@@ -328,17 +328,14 @@ Recommended implementation:
6. ~~Fix stale container URLs in IMAP workflow.~~ Done 2026-05-13.
7. ~~Implement Obsidian Semantic Index.~~ Done 2026-05-13: ChromaDB `obsidian` collection, Ollama nomic-embed-text, automated reindex every 6h.
8. Upgrade Web-to-Notes Capture.
- Add PDF and YouTube transcript support.
- Add source metadata and claim extraction.
8. ~~Upgrade Web-to-Notes Capture.~~ Done 2026-05-13: host-side content extractor on :18812, supports YouTube/PDF/web, workflow updated.
- Remaining: claim extraction, Atlas/Hermes synthesis.
9. Upgrade Voice Memo Pipeline.
- Add native Telegram/Discord voice ingestion.
- Add optional Kokoro audio summary.
9. ~~Upgrade Voice Memo Pipeline.~~ Done 2026-05-13: host-side processor on :18813, Telegram/Discord voice ingress, Kokoro TTS read-back.
- Remaining: test with real Telegram voice, action-item routing.
10. Define webhook action bus catalog.
- Document stable endpoints and schemas.
- Add `process_url`, `summarize_pdf`, `add_reminder`, `sync_vault`, `run_health_check`.
10. ~~Define webhook action bus catalog.~~ Done 2026-05-13: catalog at `Infrastructure/Automation/Webhook Action Bus.md`.
- Remaining: implement `summarize_pdf`, `sync_vault`, `run_health_check` webhook wrappers.
## Verification commands
@@ -360,6 +357,12 @@ curl -fsS --max-time 3 http://127.0.0.1:18809/health | python3 -m json.tool
curl -fsS http://127.0.0.1:18810/healthz
curl -fsS http://127.0.0.1:18810/reindex/status | python3 -m json.tool
# URL content extractor (Web-to-Notes)
curl -fsS http://127.0.0.1:18812/healthz
# Voice memo processor
curl -fsS http://127.0.0.1:18813/healthz
# Verify from inside n8n container
docker exec n8n-agent wget -qO- http://172.19.0.1:18809/health
docker exec n8n-agent wget -qO- http://172.19.0.1:18810/healthz