Files
OpenCode Test 7ca8caeecb Implement rag-search skill for semantic search
Add new skill for semantic search across personal state files and
external documentation using ChromaDB and sentence-transformers.

Components:
- search.py: Main search interface (--index, --top-k flags)
- index_personal.py: Index ~/.claude/state files
- index_docs.py: Index external docs (git repos)
- add_doc_source.py: Manage doc sources
- test_rag.py: Test suite (5/5 passing)

Features:
- Two indexes: personal (116 chunks) and docs (k0s: 846 chunks)
- all-MiniLM-L6-v2 embeddings (384 dimensions)
- ChromaDB persistent storage
- JSON output with ranked results and metadata

Documentation:
- Added to component-registry.json with triggers
- Added /rag command alias
- Updated skills/README.md
- Resolved fc-013 (vector database for agent memory)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 23:41:38 -08:00

420 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
RAG Search - Documentation Index Builder
Indexes external documentation sources for semantic search.
Supports git repos and local directories.
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Generator, Optional
# Add venv site-packages to path
VENV_PATH = Path(__file__).parent.parent / "venv" / "lib" / "python3.13" / "site-packages"
if str(VENV_PATH) not in sys.path:
sys.path.insert(0, str(VENV_PATH))
import chromadb
from sentence_transformers import SentenceTransformer
# Constants
SKILL_DIR = Path(__file__).parent.parent
SOURCES_FILE = SKILL_DIR / "references" / "sources.json"
DATA_DIR = Path.home() / ".claude" / "data" / "rag-search"
CHROMA_DIR = DATA_DIR / "chroma"
DOCS_CACHE_DIR = DATA_DIR / "docs-cache"
MODEL_NAME = "all-MiniLM-L6-v2"
COLLECTION_NAME = "docs"
# Chunking parameters
CHUNK_SIZE = 500 # Target tokens (roughly 4 chars per token)
CHUNK_OVERLAP = 50
def load_sources() -> list[dict]:
"""Load configured documentation sources."""
if not SOURCES_FILE.exists():
return []
with open(SOURCES_FILE) as f:
data = json.load(f)
return data.get("sources", [])
def save_sources(sources: list[dict]) -> None:
"""Save documentation sources."""
SOURCES_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(SOURCES_FILE, "w") as f:
json.dump({"sources": sources}, f, indent=2)
def fetch_git_source(source: dict, quiet: bool = False) -> Optional[Path]:
"""
Clone or update a git repository.
Returns:
Path to the docs directory within the repo
"""
source_id = source["id"]
url = source["url"]
version = source.get("version", "HEAD")
doc_path = source.get("path", "")
cache_dir = DOCS_CACHE_DIR / source_id
if cache_dir.exists():
# Update existing repo
if not quiet:
print(f" Updating {source_id}...")
try:
subprocess.run(
["git", "fetch", "--all"],
cwd=cache_dir,
capture_output=True,
check=True
)
subprocess.run(
["git", "checkout", version],
cwd=cache_dir,
capture_output=True,
check=True
)
subprocess.run(
["git", "pull", "--ff-only"],
cwd=cache_dir,
capture_output=True,
check=False # May fail on tags
)
except subprocess.CalledProcessError as e:
print(f" Warning: Could not update {source_id}: {e}", file=sys.stderr)
else:
# Clone new repo
if not quiet:
print(f" Cloning {source_id}...")
cache_dir.parent.mkdir(parents=True, exist_ok=True)
try:
subprocess.run(
["git", "clone", "--depth", "1", url, str(cache_dir)],
capture_output=True,
check=True
)
if version != "HEAD":
subprocess.run(
["git", "fetch", "--depth", "1", "origin", version],
cwd=cache_dir,
capture_output=True,
check=True
)
subprocess.run(
["git", "checkout", version],
cwd=cache_dir,
capture_output=True,
check=True
)
except subprocess.CalledProcessError as e:
print(f" Error: Could not clone {source_id}: {e}", file=sys.stderr)
return None
docs_dir = cache_dir / doc_path if doc_path else cache_dir
return docs_dir if docs_dir.exists() else None
def chunk_markdown(content: str, file_path: str) -> Generator[tuple[str, dict], None, None]:
"""
Chunk markdown content for embedding.
Strategy:
- Split by headers to preserve context
- Chunk sections that are too long
- Preserve header hierarchy in metadata
"""
lines = content.split("\n")
current_chunk = []
current_headers = []
chunk_start_line = 0
def emit_chunk() -> Optional[tuple[str, dict]]:
if not current_chunk:
return None
text = "\n".join(current_chunk).strip()
if len(text) < 20:
return None
metadata = {
"file": file_path,
"headers": " > ".join(current_headers) if current_headers else ""
}
return (text, metadata)
for i, line in enumerate(lines):
# Check for header
header_match = re.match(r'^(#{1,6})\s+(.+)$', line)
if header_match:
# Emit current chunk before new header
chunk = emit_chunk()
if chunk:
yield chunk
current_chunk = []
# Update header hierarchy
level = len(header_match.group(1))
header_text = header_match.group(2).strip()
# Trim headers to current level
current_headers = current_headers[:level-1]
current_headers.append(header_text)
chunk_start_line = i
current_chunk.append(line)
# Check if chunk is getting too large (rough token estimate)
chunk_text = "\n".join(current_chunk)
if len(chunk_text) > CHUNK_SIZE * 4:
chunk = emit_chunk()
if chunk:
yield chunk
# Start new chunk with overlap
overlap_lines = current_chunk[-CHUNK_OVERLAP // 10:] if len(current_chunk) > CHUNK_OVERLAP // 10 else []
current_chunk = overlap_lines
# Emit final chunk
chunk = emit_chunk()
if chunk:
yield chunk
def index_source(
source: dict,
model: SentenceTransformer,
quiet: bool = False
) -> tuple[list[str], list[list[float]], list[dict], list[str]]:
"""
Index a single documentation source.
Returns:
(chunks, embeddings, metadatas, ids)
"""
source_id = source["id"]
source_type = source.get("type", "git")
glob_pattern = source.get("glob", "**/*.md")
if source_type == "git":
docs_dir = fetch_git_source(source, quiet=quiet)
if not docs_dir:
return [], [], [], []
elif source_type == "local":
docs_dir = Path(source["path"]).expanduser()
if not docs_dir.exists():
print(f" Warning: Local path does not exist: {docs_dir}", file=sys.stderr)
return [], [], [], []
else:
print(f" Warning: Unknown source type: {source_type}", file=sys.stderr)
return [], [], [], []
chunks = []
metadatas = []
ids = []
# Find and process files
files = list(docs_dir.glob(glob_pattern))
if not quiet:
print(f" Found {len(files)} files matching {glob_pattern}")
for file_path in files:
try:
content = file_path.read_text(encoding="utf-8", errors="ignore")
except IOError:
continue
rel_path = str(file_path.relative_to(docs_dir))
full_path = f"{source_id}/{rel_path}"
for chunk_text, metadata in chunk_markdown(content, full_path):
chunk_id = f"docs_{source_id}_{len(chunks)}"
chunks.append(chunk_text)
metadata["source_id"] = source_id
metadata["source_name"] = source.get("name", source_id)
if source.get("version"):
metadata["version"] = source["version"]
if source.get("base_url"):
metadata["url"] = source["base_url"]
metadatas.append(metadata)
ids.append(chunk_id)
if not quiet:
print(f" Indexed {len(chunks)} chunks from {source_id}")
return chunks, [], metadatas, ids
def index_docs(
source_id: Optional[str] = None,
all_sources: bool = False,
quiet: bool = False
) -> dict:
"""
Index documentation sources.
Args:
source_id: Index only this source
all_sources: Index all configured sources
quiet: Suppress progress output
Returns:
Summary statistics
"""
sources = load_sources()
if not sources:
return {"error": "No documentation sources configured"}
# Filter sources
if source_id:
sources = [s for s in sources if s["id"] == source_id]
if not sources:
return {"error": f"Source not found: {source_id}"}
elif not all_sources:
return {"error": "Specify --source <id> or --all"}
if not quiet:
print(f"Indexing {len(sources)} documentation source(s)")
# Initialize model and client
model = SentenceTransformer(MODEL_NAME)
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
# Get or create collection
try:
collection = client.get_collection(COLLECTION_NAME)
# If indexing all or specific source, we'll need to handle existing data
if all_sources:
client.delete_collection(COLLECTION_NAME)
collection = client.create_collection(
name=COLLECTION_NAME,
metadata={"description": "External documentation"}
)
except Exception:
collection = client.create_collection(
name=COLLECTION_NAME,
metadata={"description": "External documentation"}
)
# Process each source
all_chunks = []
all_metadatas = []
all_ids = []
for source in sources:
if not quiet:
print(f"\nProcessing: {source['name']}")
chunks, _, metadatas, ids = index_source(source, model, quiet=quiet)
all_chunks.extend(chunks)
all_metadatas.extend(metadatas)
all_ids.extend(ids)
# Update last_indexed timestamp
source["last_indexed"] = datetime.now().isoformat()
# Batch embed and add to collection
if all_chunks:
if not quiet:
print(f"\nEmbedding {len(all_chunks)} chunks...")
embeddings = model.encode(all_chunks, show_progress_bar=not quiet).tolist()
# Add in batches
batch_size = 100
for i in range(0, len(all_chunks), batch_size):
end_idx = min(i + batch_size, len(all_chunks))
collection.add(
documents=all_chunks[i:end_idx],
embeddings=embeddings[i:end_idx],
metadatas=all_metadatas[i:end_idx],
ids=all_ids[i:end_idx]
)
# Save updated sources with timestamps
all_sources = load_sources()
for source in sources:
for s in all_sources:
if s["id"] == source["id"]:
s["last_indexed"] = source["last_indexed"]
break
save_sources(all_sources)
stats = {
"collection": COLLECTION_NAME,
"sources_processed": len(sources),
"chunks_indexed": len(all_chunks),
"indexed_at": datetime.now().isoformat()
}
if not quiet:
print(f"\nIndexed {len(all_chunks)} chunks from {len(sources)} source(s)")
return stats
def main():
parser = argparse.ArgumentParser(
description="Index external documentation for RAG search"
)
parser.add_argument(
"--source", "-s",
help="Index only this source ID"
)
parser.add_argument(
"--all", "-a",
action="store_true",
dest="all_sources",
help="Index all configured sources"
)
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Suppress progress output"
)
parser.add_argument(
"--list", "-l",
action="store_true",
help="List configured sources"
)
parser.add_argument(
"--stats",
action="store_true",
help="Output stats as JSON"
)
args = parser.parse_args()
if args.list:
sources = load_sources()
if sources:
print(json.dumps(sources, indent=2))
else:
print("No documentation sources configured")
print(f"Add sources with: add_doc_source.py")
return
stats = index_docs(
source_id=args.source,
all_sources=args.all_sources,
quiet=args.quiet
)
if args.stats or "error" in stats:
print(json.dumps(stats, indent=2))
if __name__ == "__main__":
main()