ff28a7c1ad
- url-content-extractor.py on :18812: YouTube/PDF/web content extraction - voice-memo-processor.py on :18813: Telegram/Discord/URL voice ingress + Kokoro TTS - Webhook Action Bus catalog in Obsidian vault - Updated n8n Implementation Handoff: items #8-10 done
340 lines
10 KiB
Python
340 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
URL Content Extractor Endpoint
|
|
Lightweight HTTP server that classifies URLs and extracts content.
|
|
|
|
Supports:
|
|
- YouTube videos: extracts transcript via youtube-transcript-api
|
|
- PDF files: downloads and extracts text via pymupdf
|
|
- Web pages: fetches HTML and extracts readable text via readability-lxml
|
|
|
|
Listens on 0.0.0.0:18812 (configurable via PORT env var).
|
|
|
|
Endpoints:
|
|
POST /extract -> {url: "..."} -> JSON with content_type, title, text, metadata
|
|
GET /healthz -> returns ok
|
|
"""
|
|
|
|
import http.server
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import tempfile
|
|
import traceback
|
|
import urllib.request
|
|
import urllib.parse
|
|
import urllib.error
|
|
|
|
PORT = int(os.environ.get("PORT", 18812))
|
|
MAX_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB max download
|
|
|
|
YOUTUBE_PATTERNS = [
|
|
re.compile(r'(?:youtube\.com/watch\?.*v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})'),
|
|
re.compile(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})'),
|
|
]
|
|
|
|
PDF_EXTENSIONS = ('.pdf',)
|
|
PDF_CONTENT_TYPES = ('application/pdf',)
|
|
|
|
|
|
def _import_youtube():
|
|
from youtube_transcript_api import YouTubeTranscriptApi
|
|
return YouTubeTranscriptApi
|
|
|
|
def _import_fitz():
|
|
import fitz
|
|
return fitz
|
|
|
|
def _import_readability():
|
|
from readability import Document
|
|
from lxml.html import document_fromstring
|
|
return Document, document_fromstring
|
|
|
|
|
|
def classify_url(url: str) -> str:
|
|
"""Classify URL as youtube, pdf, or web."""
|
|
parsed = urllib.parse.urlparse(url)
|
|
host = (parsed.hostname or '').lower()
|
|
path = parsed.path.lower()
|
|
|
|
# Check YouTube
|
|
for pat in YOUTUBE_PATTERNS:
|
|
if pat.search(url):
|
|
return 'youtube'
|
|
|
|
# Check PDF by extension
|
|
if path.endswith(PDF_EXTENSIONS):
|
|
return 'pdf'
|
|
|
|
# Check known PDF-hosting domains with non-.pdf paths
|
|
pdf_host_patterns = [
|
|
'arxiv.org/pdf/',
|
|
]
|
|
for pattern in pdf_host_patterns:
|
|
if pattern in url.lower():
|
|
return 'pdf'
|
|
|
|
return 'web'
|
|
|
|
|
|
def extract_youtube_id(url: str) -> str | None:
|
|
"""Extract YouTube video ID from URL."""
|
|
for pat in YOUTUBE_PATTERNS:
|
|
m = pat.search(url)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
|
|
def fetch_youtube(url: str) -> dict:
|
|
"""Extract YouTube video transcript."""
|
|
YTTA = _import_youtube()
|
|
video_id = extract_youtube_id(url)
|
|
if not video_id:
|
|
return {"error": "Could not extract YouTube video ID", "content_type": "youtube"}
|
|
|
|
try:
|
|
api = YTTA()
|
|
transcript_data = api.fetch(video_id, languages=['en', 'en-US', 'en-GB'])
|
|
|
|
# Try to get video title from the page
|
|
title = video_id
|
|
try:
|
|
req = urllib.request.Request(
|
|
f"https://www.youtube.com/watch?v={video_id}",
|
|
headers={"User-Agent": "Mozilla/5.0"}
|
|
)
|
|
resp = urllib.request.urlopen(req, timeout=15)
|
|
html = resp.read().decode('utf-8', errors='replace')
|
|
m = re.search(r'<title>(.*?)</title>', html)
|
|
if m:
|
|
title = m.group(1).replace(' - YouTube', '').strip()
|
|
except Exception:
|
|
pass
|
|
|
|
# Build transcript text
|
|
parts = []
|
|
for entry in transcript_data:
|
|
parts.append(entry.text)
|
|
text = " ".join(parts)
|
|
|
|
return {
|
|
"content_type": "youtube",
|
|
"title": title,
|
|
"text": text,
|
|
"metadata": {
|
|
"video_id": video_id,
|
|
"source_url": url,
|
|
"transcript_entries": len(transcript_data),
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"YouTube transcript extraction failed: {e}", "content_type": "youtube"}
|
|
|
|
|
|
def fetch_pdf(url: str) -> dict:
|
|
"""Download PDF and extract text."""
|
|
fitz = _import_fitz()
|
|
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
resp = urllib.request.urlopen(req, timeout=60)
|
|
data = resp.read(MAX_CONTENT_SIZE + 1)
|
|
if len(data) > MAX_CONTENT_SIZE:
|
|
return {"error": "PDF too large (>50MB)", "content_type": "pdf"}
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp:
|
|
tmp.write(data)
|
|
tmp.flush()
|
|
doc = fitz.open(tmp.name)
|
|
|
|
title = ""
|
|
author = ""
|
|
try:
|
|
meta = doc.metadata or {}
|
|
title = meta.get("title", "") or ""
|
|
author = meta.get("author", "") or ""
|
|
except Exception:
|
|
pass
|
|
|
|
if not title:
|
|
title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF"
|
|
|
|
pages = []
|
|
for page_num in range(len(doc)):
|
|
page = doc[page_num]
|
|
pages.append(page.get_text())
|
|
doc.close()
|
|
|
|
text = "\n\n".join(pages)
|
|
|
|
return {
|
|
"content_type": "pdf",
|
|
"title": title,
|
|
"text": text,
|
|
"metadata": {
|
|
"source_url": url,
|
|
"author": author,
|
|
"page_count": len(pages),
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"PDF extraction failed: {e}", "content_type": "pdf"}
|
|
|
|
|
|
def fetch_web(url: str) -> dict:
|
|
"""Fetch web page and extract readable text."""
|
|
Document, document_fromstring = _import_readability()
|
|
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
resp = urllib.request.urlopen(req, timeout=30)
|
|
|
|
# Check if response is actually a PDF (content-type detection)
|
|
content_type = resp.headers.get('Content-Type', '')
|
|
if 'application/pdf' in content_type:
|
|
# Re-process as PDF
|
|
data = resp.read(MAX_CONTENT_SIZE + 1)
|
|
if len(data) > MAX_CONTENT_SIZE:
|
|
return {"error": "PDF too large (>50MB)", "content_type": "pdf"}
|
|
|
|
fitz = _import_fitz()
|
|
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp:
|
|
tmp.write(data)
|
|
tmp.flush()
|
|
doc = fitz.open(tmp.name)
|
|
title = ""
|
|
author = ""
|
|
try:
|
|
meta = doc.metadata or {}
|
|
title = meta.get("title", "") or ""
|
|
author = meta.get("author", "") or ""
|
|
except Exception:
|
|
pass
|
|
if not title:
|
|
title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF"
|
|
pages = []
|
|
for page_num in range(len(doc)):
|
|
pages.append(doc[page_num].get_text())
|
|
doc.close()
|
|
return {
|
|
"content_type": "pdf",
|
|
"title": title,
|
|
"text": "\n\n".join(pages),
|
|
"metadata": {
|
|
"source_url": url,
|
|
"author": author,
|
|
"page_count": len(pages),
|
|
}
|
|
}
|
|
|
|
html = resp.read().decode('utf-8', errors='replace')
|
|
|
|
doc = Document(html)
|
|
title = doc.title() or ""
|
|
summary_html = doc.summary()
|
|
|
|
# Convert HTML summary to plain text
|
|
tree = document_fromstring(summary_html)
|
|
text = tree.text_content()
|
|
|
|
# Clean up whitespace
|
|
text = re.sub(r'\n{3,}', '\n\n', text)
|
|
text = text.strip()
|
|
|
|
return {
|
|
"content_type": "web",
|
|
"title": title,
|
|
"text": text,
|
|
"metadata": {
|
|
"source_url": url,
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"error": f"Web extraction failed: {e}", "content_type": "web"}
|
|
|
|
|
|
def extract_content(url: str) -> dict:
|
|
"""Main extraction dispatcher."""
|
|
content_type = classify_url(url)
|
|
|
|
if content_type == 'youtube':
|
|
return fetch_youtube(url)
|
|
elif content_type == 'pdf':
|
|
return fetch_pdf(url)
|
|
else:
|
|
return fetch_web(url)
|
|
|
|
|
|
class ExtractorHandler(http.server.BaseHTTPRequestHandler):
|
|
def do_GET(self):
|
|
path = self.path.rstrip("/")
|
|
if path == "/healthz":
|
|
self._json_response({"status": "ok"})
|
|
else:
|
|
self._json_response({"error": "not found", "hint": "POST /extract with {url: ...}"}, status=404)
|
|
|
|
def do_POST(self):
|
|
path = self.path.rstrip("/")
|
|
if path != "/extract":
|
|
self._json_response({"error": "not found"}, status=404)
|
|
return
|
|
|
|
try:
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(content_length)
|
|
data = json.loads(body) if body else {}
|
|
except Exception as e:
|
|
self._json_response({"error": f"Invalid request body: {e}"}, status=400)
|
|
return
|
|
|
|
url = data.get("url", "").strip()
|
|
if not url:
|
|
self._json_response({"error": "Missing 'url' field"}, status=400)
|
|
return
|
|
|
|
if not url.startswith(("http://", "https://")):
|
|
self._json_response({"error": "URL must start with http:// or https://"}, status=400)
|
|
return
|
|
|
|
print(f"Extracting: {url}", flush=True)
|
|
try:
|
|
result = extract_content(url)
|
|
except Exception as e:
|
|
result = {"error": f"Internal error: {e}"}
|
|
|
|
if "error" in result:
|
|
print(f"Error: {result['error']}", flush=True)
|
|
self._json_response(result, status=500)
|
|
else:
|
|
ct = result.get("content_type", "?")
|
|
tlen = len(result.get("text", ""))
|
|
print(f"Success: {ct}, {tlen} chars", flush=True)
|
|
self._json_response(result)
|
|
|
|
def _json_response(self, data, status=200):
|
|
body = json.dumps(data, indent=2).encode()
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def log_message(self, format, *args):
|
|
pass
|
|
|
|
|
|
def main():
|
|
server = http.server.HTTPServer(("0.0.0.0", PORT), ExtractorHandler)
|
|
print(f"url-content-extractor listening on 0.0.0.0:{PORT}", flush=True)
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
server.server_close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|