#!/usr/bin/env python3 """ URL Content Extractor Endpoint Lightweight HTTP server that classifies URLs and extracts content. Supports: - YouTube videos: extracts transcript via youtube-transcript-api - PDF files: downloads and extracts text via pymupdf - Web pages: fetches HTML and extracts readable text via readability-lxml Listens on 0.0.0.0:18812 (configurable via PORT env var). Endpoints: POST /extract -> {url: "..."} -> JSON with content_type, title, text, metadata GET /healthz -> returns ok """ import http.server import json import os import re import sys import tempfile import traceback import urllib.request import urllib.parse import urllib.error PORT = int(os.environ.get("PORT", 18812)) MAX_CONTENT_SIZE = 50 * 1024 * 1024 # 50MB max download YOUTUBE_PATTERNS = [ re.compile(r'(?:youtube\.com/watch\?.*v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})'), re.compile(r'youtube\.com/shorts/([a-zA-Z0-9_-]{11})'), ] PDF_EXTENSIONS = ('.pdf',) PDF_CONTENT_TYPES = ('application/pdf',) def _import_youtube(): from youtube_transcript_api import YouTubeTranscriptApi return YouTubeTranscriptApi def _import_fitz(): import fitz return fitz def _import_readability(): from readability import Document from lxml.html import document_fromstring return Document, document_fromstring def classify_url(url: str) -> str: """Classify URL as youtube, pdf, or web.""" parsed = urllib.parse.urlparse(url) host = (parsed.hostname or '').lower() path = parsed.path.lower() # Check YouTube for pat in YOUTUBE_PATTERNS: if pat.search(url): return 'youtube' # Check PDF by extension if path.endswith(PDF_EXTENSIONS): return 'pdf' # Check known PDF-hosting domains with non-.pdf paths pdf_host_patterns = [ 'arxiv.org/pdf/', ] for pattern in pdf_host_patterns: if pattern in url.lower(): return 'pdf' return 'web' def extract_youtube_id(url: str) -> str | None: """Extract YouTube video ID from URL.""" for pat in YOUTUBE_PATTERNS: m = pat.search(url) if m: return m.group(1) return None def fetch_youtube(url: str) -> dict: """Extract YouTube video transcript.""" YTTA = _import_youtube() video_id = extract_youtube_id(url) if not video_id: return {"error": "Could not extract YouTube video ID", "content_type": "youtube"} try: api = YTTA() transcript_data = api.fetch(video_id, languages=['en', 'en-US', 'en-GB']) # Try to get video title from the page title = video_id try: req = urllib.request.Request( f"https://www.youtube.com/watch?v={video_id}", headers={"User-Agent": "Mozilla/5.0"} ) resp = urllib.request.urlopen(req, timeout=15) html = resp.read().decode('utf-8', errors='replace') m = re.search(r'(.*?)', html) if m: title = m.group(1).replace(' - YouTube', '').strip() except Exception: pass # Build transcript text parts = [] for entry in transcript_data: parts.append(entry.text) text = " ".join(parts) return { "content_type": "youtube", "title": title, "text": text, "metadata": { "video_id": video_id, "source_url": url, "transcript_entries": len(transcript_data), } } except Exception as e: return {"error": f"YouTube transcript extraction failed: {e}", "content_type": "youtube"} def fetch_pdf(url: str) -> dict: """Download PDF and extract text.""" fitz = _import_fitz() try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = urllib.request.urlopen(req, timeout=60) data = resp.read(MAX_CONTENT_SIZE + 1) if len(data) > MAX_CONTENT_SIZE: return {"error": "PDF too large (>50MB)", "content_type": "pdf"} with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp: tmp.write(data) tmp.flush() doc = fitz.open(tmp.name) title = "" author = "" try: meta = doc.metadata or {} title = meta.get("title", "") or "" author = meta.get("author", "") or "" except Exception: pass if not title: title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF" pages = [] for page_num in range(len(doc)): page = doc[page_num] pages.append(page.get_text()) doc.close() text = "\n\n".join(pages) return { "content_type": "pdf", "title": title, "text": text, "metadata": { "source_url": url, "author": author, "page_count": len(pages), } } except Exception as e: return {"error": f"PDF extraction failed: {e}", "content_type": "pdf"} def fetch_web(url: str) -> dict: """Fetch web page and extract readable text.""" Document, document_fromstring = _import_readability() try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) resp = urllib.request.urlopen(req, timeout=30) # Check if response is actually a PDF (content-type detection) content_type = resp.headers.get('Content-Type', '') if 'application/pdf' in content_type: # Re-process as PDF data = resp.read(MAX_CONTENT_SIZE + 1) if len(data) > MAX_CONTENT_SIZE: return {"error": "PDF too large (>50MB)", "content_type": "pdf"} fitz = _import_fitz() with tempfile.NamedTemporaryFile(suffix='.pdf', delete=True) as tmp: tmp.write(data) tmp.flush() doc = fitz.open(tmp.name) title = "" author = "" try: meta = doc.metadata or {} title = meta.get("title", "") or "" author = meta.get("author", "") or "" except Exception: pass if not title: title = urllib.parse.urlparse(url).path.split('/')[-1] or "Untitled PDF" pages = [] for page_num in range(len(doc)): pages.append(doc[page_num].get_text()) doc.close() return { "content_type": "pdf", "title": title, "text": "\n\n".join(pages), "metadata": { "source_url": url, "author": author, "page_count": len(pages), } } html = resp.read().decode('utf-8', errors='replace') doc = Document(html) title = doc.title() or "" summary_html = doc.summary() # Convert HTML summary to plain text tree = document_fromstring(summary_html) text = tree.text_content() # Clean up whitespace text = re.sub(r'\n{3,}', '\n\n', text) text = text.strip() return { "content_type": "web", "title": title, "text": text, "metadata": { "source_url": url, } } except Exception as e: return {"error": f"Web extraction failed: {e}", "content_type": "web"} def extract_content(url: str) -> dict: """Main extraction dispatcher.""" content_type = classify_url(url) if content_type == 'youtube': return fetch_youtube(url) elif content_type == 'pdf': return fetch_pdf(url) else: return fetch_web(url) class ExtractorHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): path = self.path.rstrip("/") if path == "/healthz": self._json_response({"status": "ok"}) else: self._json_response({"error": "not found", "hint": "POST /extract with {url: ...}"}, status=404) def do_POST(self): path = self.path.rstrip("/") if path != "/extract": self._json_response({"error": "not found"}, status=404) return try: content_length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(content_length) data = json.loads(body) if body else {} except Exception as e: self._json_response({"error": f"Invalid request body: {e}"}, status=400) return url = data.get("url", "").strip() if not url: self._json_response({"error": "Missing 'url' field"}, status=400) return if not url.startswith(("http://", "https://")): self._json_response({"error": "URL must start with http:// or https://"}, status=400) return print(f"Extracting: {url}", flush=True) try: result = extract_content(url) except Exception as e: result = {"error": f"Internal error: {e}"} if "error" in result: print(f"Error: {result['error']}", flush=True) self._json_response(result, status=500) else: ct = result.get("content_type", "?") tlen = len(result.get("text", "")) print(f"Success: {ct}, {tlen} chars", flush=True) self._json_response(result) def _json_response(self, data, status=200): body = json.dumps(data, indent=2).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, format, *args): pass def main(): server = http.server.HTTPServer(("0.0.0.0", PORT), ExtractorHandler) print(f"url-content-extractor listening on 0.0.0.0:{PORT}", flush=True) try: server.serve_forever() except KeyboardInterrupt: pass server.server_close() if __name__ == "__main__": main()