#!/usr/bin/env python3 """ RAG Search - Documentation Index Builder Indexes external documentation sources for semantic search. Supports git repos and local directories. """ import argparse import json import os import re import subprocess import sys from datetime import datetime from pathlib import Path from typing import Generator, Optional # Add venv site-packages to path VENV_PATH = Path(__file__).parent.parent / "venv" / "lib" / "python3.13" / "site-packages" if str(VENV_PATH) not in sys.path: sys.path.insert(0, str(VENV_PATH)) import chromadb from sentence_transformers import SentenceTransformer # Constants SKILL_DIR = Path(__file__).parent.parent SOURCES_FILE = SKILL_DIR / "references" / "sources.json" DATA_DIR = Path.home() / ".claude" / "data" / "rag-search" CHROMA_DIR = DATA_DIR / "chroma" DOCS_CACHE_DIR = DATA_DIR / "docs-cache" MODEL_NAME = "all-MiniLM-L6-v2" COLLECTION_NAME = "docs" # Chunking parameters CHUNK_SIZE = 500 # Target tokens (roughly 4 chars per token) CHUNK_OVERLAP = 50 def load_sources() -> list[dict]: """Load configured documentation sources.""" if not SOURCES_FILE.exists(): return [] with open(SOURCES_FILE) as f: data = json.load(f) return data.get("sources", []) def save_sources(sources: list[dict]) -> None: """Save documentation sources.""" SOURCES_FILE.parent.mkdir(parents=True, exist_ok=True) with open(SOURCES_FILE, "w") as f: json.dump({"sources": sources}, f, indent=2) def fetch_git_source(source: dict, quiet: bool = False) -> Optional[Path]: """ Clone or update a git repository. Returns: Path to the docs directory within the repo """ source_id = source["id"] url = source["url"] version = source.get("version", "HEAD") doc_path = source.get("path", "") cache_dir = DOCS_CACHE_DIR / source_id if cache_dir.exists(): # Update existing repo if not quiet: print(f" Updating {source_id}...") try: subprocess.run( ["git", "fetch", "--all"], cwd=cache_dir, capture_output=True, check=True ) subprocess.run( ["git", "checkout", version], cwd=cache_dir, capture_output=True, check=True ) subprocess.run( ["git", "pull", "--ff-only"], cwd=cache_dir, capture_output=True, check=False # May fail on tags ) except subprocess.CalledProcessError as e: print(f" Warning: Could not update {source_id}: {e}", file=sys.stderr) else: # Clone new repo if not quiet: print(f" Cloning {source_id}...") cache_dir.parent.mkdir(parents=True, exist_ok=True) try: subprocess.run( ["git", "clone", "--depth", "1", url, str(cache_dir)], capture_output=True, check=True ) if version != "HEAD": subprocess.run( ["git", "fetch", "--depth", "1", "origin", version], cwd=cache_dir, capture_output=True, check=True ) subprocess.run( ["git", "checkout", version], cwd=cache_dir, capture_output=True, check=True ) except subprocess.CalledProcessError as e: print(f" Error: Could not clone {source_id}: {e}", file=sys.stderr) return None docs_dir = cache_dir / doc_path if doc_path else cache_dir return docs_dir if docs_dir.exists() else None def chunk_markdown(content: str, file_path: str) -> Generator[tuple[str, dict], None, None]: """ Chunk markdown content for embedding. Strategy: - Split by headers to preserve context - Chunk sections that are too long - Preserve header hierarchy in metadata """ lines = content.split("\n") current_chunk = [] current_headers = [] chunk_start_line = 0 def emit_chunk() -> Optional[tuple[str, dict]]: if not current_chunk: return None text = "\n".join(current_chunk).strip() if len(text) < 20: return None metadata = { "file": file_path, "headers": " > ".join(current_headers) if current_headers else "" } return (text, metadata) for i, line in enumerate(lines): # Check for header header_match = re.match(r'^(#{1,6})\s+(.+)$', line) if header_match: # Emit current chunk before new header chunk = emit_chunk() if chunk: yield chunk current_chunk = [] # Update header hierarchy level = len(header_match.group(1)) header_text = header_match.group(2).strip() # Trim headers to current level current_headers = current_headers[:level-1] current_headers.append(header_text) chunk_start_line = i current_chunk.append(line) # Check if chunk is getting too large (rough token estimate) chunk_text = "\n".join(current_chunk) if len(chunk_text) > CHUNK_SIZE * 4: chunk = emit_chunk() if chunk: yield chunk # Start new chunk with overlap overlap_lines = current_chunk[-CHUNK_OVERLAP // 10:] if len(current_chunk) > CHUNK_OVERLAP // 10 else [] current_chunk = overlap_lines # Emit final chunk chunk = emit_chunk() if chunk: yield chunk def index_source( source: dict, model: SentenceTransformer, quiet: bool = False ) -> tuple[list[str], list[list[float]], list[dict], list[str]]: """ Index a single documentation source. Returns: (chunks, embeddings, metadatas, ids) """ source_id = source["id"] source_type = source.get("type", "git") glob_pattern = source.get("glob", "**/*.md") if source_type == "git": docs_dir = fetch_git_source(source, quiet=quiet) if not docs_dir: return [], [], [], [] elif source_type == "local": docs_dir = Path(source["path"]).expanduser() if not docs_dir.exists(): print(f" Warning: Local path does not exist: {docs_dir}", file=sys.stderr) return [], [], [], [] else: print(f" Warning: Unknown source type: {source_type}", file=sys.stderr) return [], [], [], [] chunks = [] metadatas = [] ids = [] # Find and process files files = list(docs_dir.glob(glob_pattern)) if not quiet: print(f" Found {len(files)} files matching {glob_pattern}") for file_path in files: try: content = file_path.read_text(encoding="utf-8", errors="ignore") except IOError: continue rel_path = str(file_path.relative_to(docs_dir)) full_path = f"{source_id}/{rel_path}" for chunk_text, metadata in chunk_markdown(content, full_path): chunk_id = f"docs_{source_id}_{len(chunks)}" chunks.append(chunk_text) metadata["source_id"] = source_id metadata["source_name"] = source.get("name", source_id) if source.get("version"): metadata["version"] = source["version"] if source.get("base_url"): metadata["url"] = source["base_url"] metadatas.append(metadata) ids.append(chunk_id) if not quiet: print(f" Indexed {len(chunks)} chunks from {source_id}") return chunks, [], metadatas, ids def index_docs( source_id: Optional[str] = None, all_sources: bool = False, quiet: bool = False ) -> dict: """ Index documentation sources. Args: source_id: Index only this source all_sources: Index all configured sources quiet: Suppress progress output Returns: Summary statistics """ sources = load_sources() if not sources: return {"error": "No documentation sources configured"} # Filter sources if source_id: sources = [s for s in sources if s["id"] == source_id] if not sources: return {"error": f"Source not found: {source_id}"} elif not all_sources: return {"error": "Specify --source or --all"} if not quiet: print(f"Indexing {len(sources)} documentation source(s)") # Initialize model and client model = SentenceTransformer(MODEL_NAME) CHROMA_DIR.mkdir(parents=True, exist_ok=True) client = chromadb.PersistentClient(path=str(CHROMA_DIR)) # Get or create collection try: collection = client.get_collection(COLLECTION_NAME) # If indexing all or specific source, we'll need to handle existing data if all_sources: client.delete_collection(COLLECTION_NAME) collection = client.create_collection( name=COLLECTION_NAME, metadata={"description": "External documentation"} ) except Exception: collection = client.create_collection( name=COLLECTION_NAME, metadata={"description": "External documentation"} ) # Process each source all_chunks = [] all_metadatas = [] all_ids = [] for source in sources: if not quiet: print(f"\nProcessing: {source['name']}") chunks, _, metadatas, ids = index_source(source, model, quiet=quiet) all_chunks.extend(chunks) all_metadatas.extend(metadatas) all_ids.extend(ids) # Update last_indexed timestamp source["last_indexed"] = datetime.now().isoformat() # Batch embed and add to collection if all_chunks: if not quiet: print(f"\nEmbedding {len(all_chunks)} chunks...") embeddings = model.encode(all_chunks, show_progress_bar=not quiet).tolist() # Add in batches batch_size = 100 for i in range(0, len(all_chunks), batch_size): end_idx = min(i + batch_size, len(all_chunks)) collection.add( documents=all_chunks[i:end_idx], embeddings=embeddings[i:end_idx], metadatas=all_metadatas[i:end_idx], ids=all_ids[i:end_idx] ) # Save updated sources with timestamps all_sources = load_sources() for source in sources: for s in all_sources: if s["id"] == source["id"]: s["last_indexed"] = source["last_indexed"] break save_sources(all_sources) stats = { "collection": COLLECTION_NAME, "sources_processed": len(sources), "chunks_indexed": len(all_chunks), "indexed_at": datetime.now().isoformat() } if not quiet: print(f"\nIndexed {len(all_chunks)} chunks from {len(sources)} source(s)") return stats def main(): parser = argparse.ArgumentParser( description="Index external documentation for RAG search" ) parser.add_argument( "--source", "-s", help="Index only this source ID" ) parser.add_argument( "--all", "-a", action="store_true", dest="all_sources", help="Index all configured sources" ) parser.add_argument( "--quiet", "-q", action="store_true", help="Suppress progress output" ) parser.add_argument( "--list", "-l", action="store_true", help="List configured sources" ) parser.add_argument( "--stats", action="store_true", help="Output stats as JSON" ) args = parser.parse_args() if args.list: sources = load_sources() if sources: print(json.dumps(sources, indent=2)) else: print("No documentation sources configured") print(f"Add sources with: add_doc_source.py") return stats = index_docs( source_id=args.source, all_sources=args.all_sources, quiet=args.quiet ) if args.stats or "error" in stats: print(json.dumps(stats, indent=2)) if __name__ == "__main__": main()