Files
swarm-master/scripts/url-content-extractor.py
T
2026-06-04 13:26:50 -07:00

340 lines
10 KiB
Python

#!/usr/bin/env python3
"""
URL Content Extractor Endpoint
Lightweight HTTP server that classifies URLs and extracts content.
Supports:
- YouTube videos: extracts transcript via youtube-transcript-api
- PDF files: downloads and extracts text via pymupdf
- Web pages: fetches HTML and extracts readable text via readability-lxml
Listens on 0.0.0.0:18812 (configurable via PORT env var).
Endpoints:
POST /extract -> {url: "..."} -> JSON with content_type, title, text, metadata
GET /healthz -> returns ok
"""
import http.server
import json
import os
import re
import sys
import tempfile
import traceback
import urllib.request
import urllib.parse
import urllib.error
PORT = int(os.environ.get("PORT", 18812))
MAX_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB max download
YOUTUBE_PATTERNS = [
re.compile(r'(?:youtube\.com/watch\?.*v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})'),
re.compile(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})'),
]
PDF_EXTENSIONS = ('.pdf',)
PDF_CONTENT_TYPES = ('application/pdf',)
def _import_youtube():
from youtube_transcript_api import YouTubeTranscriptApi
return YouTubeTranscriptApi
def _import_fitz():
import fitz
return fitz
def _import_readability():
from readability import Document
from lxml.html import document_fromstring
return Document, document_fromstring
def classify_url(url: str) -> str:
"""Classify URL as youtube, pdf, or web."""
parsed = urllib.parse.urlparse(url)
host = (parsed.hostname or '').lower()
path = parsed.path.lower()
# Check YouTube
for pat in YOUTUBE_PATTERNS:
if pat.search(url):
return 'youtube'
# Check PDF by extension
if path.endswith(PDF_EXTENSIONS):
return 'pdf'
# Check known PDF-hosting domains with non-.pdf paths
pdf_host_patterns = [
'arxiv.org/pdf/',
]
for pattern in pdf_host_patterns:
if pattern in url.lower():
return 'pdf'
return 'web'
def extract_youtube_id(url: str) -> str | None:
"""Extract YouTube video ID from URL."""
for pat in YOUTUBE_PATTERNS:
m = pat.search(url)
if m:
return m.group(1)
return None
def fetch_youtube(url: str) -> dict:
"""Extract YouTube video transcript."""
YTTA = _import_youtube()
video_id = extract_youtube_id(url)
if not video_id:
return {"error": "Could not extract YouTube video ID", "content_type": "youtube"}
try:
api = YTTA()
transcript_data = api.fetch(video_id, languages=['en', 'en-US', 'en-GB'])
# Try to get video title from the page
title = video_id
try:
req = urllib.request.Request(
f"https://www.youtube.com/watch?v={video_id}",
headers={"User-Agent": "Mozilla/5.0"}
)
resp = urllib.request.urlopen(req, timeout=15)
html = resp.read().decode('utf-8', errors='replace')
m = re.search(r'<title>(.*?)</title>', html)
if m:
title = m.group(1).replace(' - YouTube', '').strip()
except Exception:
pass
# Build transcript text
parts = []
for entry in transcript_data:
parts.append(entry.text)
text = " ".join(parts)
return {
"content_type": "youtube",
"title": title,
"text": text,
"metadata": {
"video_id": video_id,
"source_url": url,
"transcript_entries": len(transcript_data),
}
}
except Exception as e:
return {"error": f"YouTube transcript extraction failed: {e}", "content_type": "youtube"}
def fetch_pdf(url: str) -> dict:
"""Download PDF and extract text."""
fitz = _import_fitz()
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=60)
data = resp.read(MAX_CONTENT_SIZE + 1)
if len(data) > MAX_CONTENT_SIZE:
return {"error": "PDF too large (>50MB)", "content_type": "pdf"}
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp:
tmp.write(data)
tmp.flush()
doc = fitz.open(tmp.name)
title = ""
author = ""
try:
meta = doc.metadata or {}
title = meta.get("title", "") or ""
author = meta.get("author", "") or ""
except Exception:
pass
if not title:
title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF"
pages = []
for page_num in range(len(doc)):
page = doc[page_num]
pages.append(page.get_text())
doc.close()
text = "\n\n".join(pages)
return {
"content_type": "pdf",
"title": title,
"text": text,
"metadata": {
"source_url": url,
"author": author,
"page_count": len(pages),
}
}
except Exception as e:
return {"error": f"PDF extraction failed: {e}", "content_type": "pdf"}
def fetch_web(url: str) -> dict:
"""Fetch web page and extract readable text."""
Document, document_fromstring = _import_readability()
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
resp = urllib.request.urlopen(req, timeout=30)
# Check if response is actually a PDF (content-type detection)
content_type = resp.headers.get('Content-Type', '')
if 'application/pdf' in content_type:
# Re-process as PDF
data = resp.read(MAX_CONTENT_SIZE + 1)
if len(data) > MAX_CONTENT_SIZE:
return {"error": "PDF too large (>50MB)", "content_type": "pdf"}
fitz = _import_fitz()
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp:
tmp.write(data)
tmp.flush()
doc = fitz.open(tmp.name)
title = ""
author = ""
try:
meta = doc.metadata or {}
title = meta.get("title", "") or ""
author = meta.get("author", "") or ""
except Exception:
pass
if not title:
title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF"
pages = []
for page_num in range(len(doc)):
pages.append(doc[page_num].get_text())
doc.close()
return {
"content_type": "pdf",
"title": title,
"text": "\n\n".join(pages),
"metadata": {
"source_url": url,
"author": author,
"page_count": len(pages),
}
}
html = resp.read().decode('utf-8', errors='replace')
doc = Document(html)
title = doc.title() or ""
summary_html = doc.summary()
# Convert HTML summary to plain text
tree = document_fromstring(summary_html)
text = tree.text_content()
# Clean up whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = text.strip()
return {
"content_type": "web",
"title": title,
"text": text,
"metadata": {
"source_url": url,
}
}
except Exception as e:
return {"error": f"Web extraction failed: {e}", "content_type": "web"}
def extract_content(url: str) -> dict:
"""Main extraction dispatcher."""
content_type = classify_url(url)
if content_type == 'youtube':
return fetch_youtube(url)
elif content_type == 'pdf':
return fetch_pdf(url)
else:
return fetch_web(url)
class ExtractorHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
path = self.path.rstrip("/")
if path == "/healthz":
self._json_response({"status": "ok"})
else:
self._json_response({"error": "not found", "hint": "POST /extract with {url: ...}"}, status=404)
def do_POST(self):
path = self.path.rstrip("/")
if path != "/extract":
self._json_response({"error": "not found"}, status=404)
return
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
data = json.loads(body) if body else {}
except Exception as e:
self._json_response({"error": f"Invalid request body: {e}"}, status=400)
return
url = data.get("url", "").strip()
if not url:
self._json_response({"error": "Missing 'url' field"}, status=400)
return
if not url.startswith(("http://", "https://")):
self._json_response({"error": "URL must start with http:// or https://"}, status=400)
return
print(f"Extracting: {url}", flush=True)
try:
result = extract_content(url)
except Exception as e:
result = {"error": f"Internal error: {e}"}
if "error" in result:
print(f"Error: {result['error']}", flush=True)
self._json_response(result, status=500)
else:
ct = result.get("content_type", "?")
tlen = len(result.get("text", ""))
print(f"Success: {ct}, {tlen} chars", flush=True)
self._json_response(result)
def _json_response(self, data, status=200):
body = json.dumps(data, indent=2).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
pass
def main():
server = http.server.HTTPServer(("0.0.0.0", PORT), ExtractorHandler)
print(f"url-content-extractor listening on 0.0.0.0:{PORT}", flush=True)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
server.server_close()
if __name__ == "__main__":
main()