Add new skill for semantic search across personal state files and external documentation using ChromaDB and sentence-transformers. Components: - search.py: Main search interface (--index, --top-k flags) - index_personal.py: Index ~/.claude/state files - index_docs.py: Index external docs (git repos) - add_doc_source.py: Manage doc sources - test_rag.py: Test suite (5/5 passing) Features: - Two indexes: personal (116 chunks) and docs (k0s: 846 chunks) - all-MiniLM-L6-v2 embeddings (384 dimensions) - ChromaDB persistent storage - JSON output with ranked results and metadata Documentation: - Added to component-registry.json with triggers - Added /rag command alias - Updated skills/README.md - Resolved fc-013 (vector database for agent memory) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
420 lines
12 KiB
Python
Executable File
420 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
RAG Search - Documentation Index Builder
|
|
|
|
Indexes external documentation sources for semantic search.
|
|
Supports git repos and local directories.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Generator, Optional
|
|
|
|
# Add venv site-packages to path
|
|
VENV_PATH = Path(__file__).parent.parent / "venv" / "lib" / "python3.13" / "site-packages"
|
|
if str(VENV_PATH) not in sys.path:
|
|
sys.path.insert(0, str(VENV_PATH))
|
|
|
|
import chromadb
|
|
from sentence_transformers import SentenceTransformer
|
|
|
|
# Constants
|
|
SKILL_DIR = Path(__file__).parent.parent
|
|
SOURCES_FILE = SKILL_DIR / "references" / "sources.json"
|
|
DATA_DIR = Path.home() / ".claude" / "data" / "rag-search"
|
|
CHROMA_DIR = DATA_DIR / "chroma"
|
|
DOCS_CACHE_DIR = DATA_DIR / "docs-cache"
|
|
MODEL_NAME = "all-MiniLM-L6-v2"
|
|
COLLECTION_NAME = "docs"
|
|
|
|
# Chunking parameters
|
|
CHUNK_SIZE = 500 # Target tokens (roughly 4 chars per token)
|
|
CHUNK_OVERLAP = 50
|
|
|
|
|
|
def load_sources() -> list[dict]:
|
|
"""Load configured documentation sources."""
|
|
if not SOURCES_FILE.exists():
|
|
return []
|
|
with open(SOURCES_FILE) as f:
|
|
data = json.load(f)
|
|
return data.get("sources", [])
|
|
|
|
|
|
def save_sources(sources: list[dict]) -> None:
|
|
"""Save documentation sources."""
|
|
SOURCES_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(SOURCES_FILE, "w") as f:
|
|
json.dump({"sources": sources}, f, indent=2)
|
|
|
|
|
|
def fetch_git_source(source: dict, quiet: bool = False) -> Optional[Path]:
|
|
"""
|
|
Clone or update a git repository.
|
|
|
|
Returns:
|
|
Path to the docs directory within the repo
|
|
"""
|
|
source_id = source["id"]
|
|
url = source["url"]
|
|
version = source.get("version", "HEAD")
|
|
doc_path = source.get("path", "")
|
|
|
|
cache_dir = DOCS_CACHE_DIR / source_id
|
|
|
|
if cache_dir.exists():
|
|
# Update existing repo
|
|
if not quiet:
|
|
print(f" Updating {source_id}...")
|
|
try:
|
|
subprocess.run(
|
|
["git", "fetch", "--all"],
|
|
cwd=cache_dir,
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
subprocess.run(
|
|
["git", "checkout", version],
|
|
cwd=cache_dir,
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
subprocess.run(
|
|
["git", "pull", "--ff-only"],
|
|
cwd=cache_dir,
|
|
capture_output=True,
|
|
check=False # May fail on tags
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f" Warning: Could not update {source_id}: {e}", file=sys.stderr)
|
|
else:
|
|
# Clone new repo
|
|
if not quiet:
|
|
print(f" Cloning {source_id}...")
|
|
cache_dir.parent.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
subprocess.run(
|
|
["git", "clone", "--depth", "1", url, str(cache_dir)],
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
if version != "HEAD":
|
|
subprocess.run(
|
|
["git", "fetch", "--depth", "1", "origin", version],
|
|
cwd=cache_dir,
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
subprocess.run(
|
|
["git", "checkout", version],
|
|
cwd=cache_dir,
|
|
capture_output=True,
|
|
check=True
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
print(f" Error: Could not clone {source_id}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
docs_dir = cache_dir / doc_path if doc_path else cache_dir
|
|
return docs_dir if docs_dir.exists() else None
|
|
|
|
|
|
def chunk_markdown(content: str, file_path: str) -> Generator[tuple[str, dict], None, None]:
|
|
"""
|
|
Chunk markdown content for embedding.
|
|
|
|
Strategy:
|
|
- Split by headers to preserve context
|
|
- Chunk sections that are too long
|
|
- Preserve header hierarchy in metadata
|
|
"""
|
|
lines = content.split("\n")
|
|
current_chunk = []
|
|
current_headers = []
|
|
chunk_start_line = 0
|
|
|
|
def emit_chunk() -> Optional[tuple[str, dict]]:
|
|
if not current_chunk:
|
|
return None
|
|
text = "\n".join(current_chunk).strip()
|
|
if len(text) < 20:
|
|
return None
|
|
|
|
metadata = {
|
|
"file": file_path,
|
|
"headers": " > ".join(current_headers) if current_headers else ""
|
|
}
|
|
return (text, metadata)
|
|
|
|
for i, line in enumerate(lines):
|
|
# Check for header
|
|
header_match = re.match(r'^(#{1,6})\s+(.+)$', line)
|
|
|
|
if header_match:
|
|
# Emit current chunk before new header
|
|
chunk = emit_chunk()
|
|
if chunk:
|
|
yield chunk
|
|
current_chunk = []
|
|
|
|
# Update header hierarchy
|
|
level = len(header_match.group(1))
|
|
header_text = header_match.group(2).strip()
|
|
|
|
# Trim headers to current level
|
|
current_headers = current_headers[:level-1]
|
|
current_headers.append(header_text)
|
|
|
|
chunk_start_line = i
|
|
|
|
current_chunk.append(line)
|
|
|
|
# Check if chunk is getting too large (rough token estimate)
|
|
chunk_text = "\n".join(current_chunk)
|
|
if len(chunk_text) > CHUNK_SIZE * 4:
|
|
chunk = emit_chunk()
|
|
if chunk:
|
|
yield chunk
|
|
# Start new chunk with overlap
|
|
overlap_lines = current_chunk[-CHUNK_OVERLAP // 10:] if len(current_chunk) > CHUNK_OVERLAP // 10 else []
|
|
current_chunk = overlap_lines
|
|
|
|
# Emit final chunk
|
|
chunk = emit_chunk()
|
|
if chunk:
|
|
yield chunk
|
|
|
|
|
|
def index_source(
|
|
source: dict,
|
|
model: SentenceTransformer,
|
|
quiet: bool = False
|
|
) -> tuple[list[str], list[list[float]], list[dict], list[str]]:
|
|
"""
|
|
Index a single documentation source.
|
|
|
|
Returns:
|
|
(chunks, embeddings, metadatas, ids)
|
|
"""
|
|
source_id = source["id"]
|
|
source_type = source.get("type", "git")
|
|
glob_pattern = source.get("glob", "**/*.md")
|
|
|
|
if source_type == "git":
|
|
docs_dir = fetch_git_source(source, quiet=quiet)
|
|
if not docs_dir:
|
|
return [], [], [], []
|
|
elif source_type == "local":
|
|
docs_dir = Path(source["path"]).expanduser()
|
|
if not docs_dir.exists():
|
|
print(f" Warning: Local path does not exist: {docs_dir}", file=sys.stderr)
|
|
return [], [], [], []
|
|
else:
|
|
print(f" Warning: Unknown source type: {source_type}", file=sys.stderr)
|
|
return [], [], [], []
|
|
|
|
chunks = []
|
|
metadatas = []
|
|
ids = []
|
|
|
|
# Find and process files
|
|
files = list(docs_dir.glob(glob_pattern))
|
|
if not quiet:
|
|
print(f" Found {len(files)} files matching {glob_pattern}")
|
|
|
|
for file_path in files:
|
|
try:
|
|
content = file_path.read_text(encoding="utf-8", errors="ignore")
|
|
except IOError:
|
|
continue
|
|
|
|
rel_path = str(file_path.relative_to(docs_dir))
|
|
full_path = f"{source_id}/{rel_path}"
|
|
|
|
for chunk_text, metadata in chunk_markdown(content, full_path):
|
|
chunk_id = f"docs_{source_id}_{len(chunks)}"
|
|
chunks.append(chunk_text)
|
|
metadata["source_id"] = source_id
|
|
metadata["source_name"] = source.get("name", source_id)
|
|
if source.get("version"):
|
|
metadata["version"] = source["version"]
|
|
if source.get("base_url"):
|
|
metadata["url"] = source["base_url"]
|
|
metadatas.append(metadata)
|
|
ids.append(chunk_id)
|
|
|
|
if not quiet:
|
|
print(f" Indexed {len(chunks)} chunks from {source_id}")
|
|
|
|
return chunks, [], metadatas, ids
|
|
|
|
|
|
def index_docs(
|
|
source_id: Optional[str] = None,
|
|
all_sources: bool = False,
|
|
quiet: bool = False
|
|
) -> dict:
|
|
"""
|
|
Index documentation sources.
|
|
|
|
Args:
|
|
source_id: Index only this source
|
|
all_sources: Index all configured sources
|
|
quiet: Suppress progress output
|
|
|
|
Returns:
|
|
Summary statistics
|
|
"""
|
|
sources = load_sources()
|
|
if not sources:
|
|
return {"error": "No documentation sources configured"}
|
|
|
|
# Filter sources
|
|
if source_id:
|
|
sources = [s for s in sources if s["id"] == source_id]
|
|
if not sources:
|
|
return {"error": f"Source not found: {source_id}"}
|
|
elif not all_sources:
|
|
return {"error": "Specify --source <id> or --all"}
|
|
|
|
if not quiet:
|
|
print(f"Indexing {len(sources)} documentation source(s)")
|
|
|
|
# Initialize model and client
|
|
model = SentenceTransformer(MODEL_NAME)
|
|
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
|
|
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
|
|
|
|
# Get or create collection
|
|
try:
|
|
collection = client.get_collection(COLLECTION_NAME)
|
|
# If indexing all or specific source, we'll need to handle existing data
|
|
if all_sources:
|
|
client.delete_collection(COLLECTION_NAME)
|
|
collection = client.create_collection(
|
|
name=COLLECTION_NAME,
|
|
metadata={"description": "External documentation"}
|
|
)
|
|
except Exception:
|
|
collection = client.create_collection(
|
|
name=COLLECTION_NAME,
|
|
metadata={"description": "External documentation"}
|
|
)
|
|
|
|
# Process each source
|
|
all_chunks = []
|
|
all_metadatas = []
|
|
all_ids = []
|
|
|
|
for source in sources:
|
|
if not quiet:
|
|
print(f"\nProcessing: {source['name']}")
|
|
|
|
chunks, _, metadatas, ids = index_source(source, model, quiet=quiet)
|
|
all_chunks.extend(chunks)
|
|
all_metadatas.extend(metadatas)
|
|
all_ids.extend(ids)
|
|
|
|
# Update last_indexed timestamp
|
|
source["last_indexed"] = datetime.now().isoformat()
|
|
|
|
# Batch embed and add to collection
|
|
if all_chunks:
|
|
if not quiet:
|
|
print(f"\nEmbedding {len(all_chunks)} chunks...")
|
|
|
|
embeddings = model.encode(all_chunks, show_progress_bar=not quiet).tolist()
|
|
|
|
# Add in batches
|
|
batch_size = 100
|
|
for i in range(0, len(all_chunks), batch_size):
|
|
end_idx = min(i + batch_size, len(all_chunks))
|
|
collection.add(
|
|
documents=all_chunks[i:end_idx],
|
|
embeddings=embeddings[i:end_idx],
|
|
metadatas=all_metadatas[i:end_idx],
|
|
ids=all_ids[i:end_idx]
|
|
)
|
|
|
|
# Save updated sources with timestamps
|
|
all_sources = load_sources()
|
|
for source in sources:
|
|
for s in all_sources:
|
|
if s["id"] == source["id"]:
|
|
s["last_indexed"] = source["last_indexed"]
|
|
break
|
|
save_sources(all_sources)
|
|
|
|
stats = {
|
|
"collection": COLLECTION_NAME,
|
|
"sources_processed": len(sources),
|
|
"chunks_indexed": len(all_chunks),
|
|
"indexed_at": datetime.now().isoformat()
|
|
}
|
|
|
|
if not quiet:
|
|
print(f"\nIndexed {len(all_chunks)} chunks from {len(sources)} source(s)")
|
|
|
|
return stats
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Index external documentation for RAG search"
|
|
)
|
|
parser.add_argument(
|
|
"--source", "-s",
|
|
help="Index only this source ID"
|
|
)
|
|
parser.add_argument(
|
|
"--all", "-a",
|
|
action="store_true",
|
|
dest="all_sources",
|
|
help="Index all configured sources"
|
|
)
|
|
parser.add_argument(
|
|
"--quiet", "-q",
|
|
action="store_true",
|
|
help="Suppress progress output"
|
|
)
|
|
parser.add_argument(
|
|
"--list", "-l",
|
|
action="store_true",
|
|
help="List configured sources"
|
|
)
|
|
parser.add_argument(
|
|
"--stats",
|
|
action="store_true",
|
|
help="Output stats as JSON"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.list:
|
|
sources = load_sources()
|
|
if sources:
|
|
print(json.dumps(sources, indent=2))
|
|
else:
|
|
print("No documentation sources configured")
|
|
print(f"Add sources with: add_doc_source.py")
|
|
return
|
|
|
|
stats = index_docs(
|
|
source_id=args.source,
|
|
all_sources=args.all_sources,
|
|
quiet=args.quiet
|
|
)
|
|
|
|
if args.stats or "error" in stats:
|
|
print(json.dumps(stats, indent=2))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|