Implement rag-search skill for semantic search

Add new skill for semantic search across personal state files and
external documentation using ChromaDB and sentence-transformers.

Components:
- search.py: Main search interface (--index, --top-k flags)
- index_personal.py: Index ~/.claude/state files
- index_docs.py: Index external docs (git repos)
- add_doc_source.py: Manage doc sources
- test_rag.py: Test suite (5/5 passing)

Features:
- Two indexes: personal (116 chunks) and docs (k0s: 846 chunks)
- all-MiniLM-L6-v2 embeddings (384 dimensions)
- ChromaDB persistent storage
- JSON output with ranked results and metadata

Documentation:
- Added to component-registry.json with triggers
- Added /rag command alias
- Updated skills/README.md
- Resolved fc-013 (vector database for agent memory)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OpenCode Test
2026-01-04 23:41:38 -08:00
parent c21b152de8
commit 7ca8caeecb
11 changed files with 1781 additions and 155 deletions

123
skills/rag-search/SKILL.md Normal file
View File

@@ -0,0 +1,123 @@
---
name: rag-search
description: Semantic search across personal state files and external documentation
triggers: [search, find, lookup, what did, how did, when did, past decisions, previous, documentation, docs]
---
# RAG Search Skill
Semantic search across two indexes:
- **personal**: Your state files, memory, decisions, preferences
- **docs**: External documentation (k0s, ArgoCD, etc.)
## When to Use
- "What decisions did I make about X?"
- "How did I configure Y?"
- "What does the k0s documentation say about Z?"
- "Find my past notes on..."
- Cross-referencing personal context with official docs
## Scripts
All scripts use the venv at `~/.claude/skills/rag-search/venv/`.
### Search (Primary Interface)
```bash
# Search both indexes
~/.claude/skills/rag-search/venv/bin/python \
~/.claude/skills/rag-search/scripts/search.py "query"
# Search specific index
~/.claude/skills/rag-search/scripts/search.py --index personal "query"
~/.claude/skills/rag-search/scripts/search.py --index docs "query"
# Control result count
~/.claude/skills/rag-search/scripts/search.py --top-k 10 "query"
```
### Index Management
```bash
# Reindex personal state files
~/.claude/skills/rag-search/venv/bin/python \
~/.claude/skills/rag-search/scripts/index_personal.py
# Index all doc sources
~/.claude/skills/rag-search/venv/bin/python \
~/.claude/skills/rag-search/scripts/index_docs.py --all
# Index specific doc source
~/.claude/skills/rag-search/scripts/index_docs.py --source k0s
```
### Adding Doc Sources
```bash
# Add a git-based doc source
~/.claude/skills/rag-search/venv/bin/python \
~/.claude/skills/rag-search/scripts/add_doc_source.py \
--id "argocd" \
--name "ArgoCD Documentation" \
--type git \
--url "https://github.com/argoproj/argo-cd.git" \
--path "docs/" \
--glob "**/*.md"
# List configured sources
~/.claude/skills/rag-search/scripts/add_doc_source.py --list
```
## Output Format
Search returns JSON:
```json
{
"query": "your search query",
"results": [
{
"rank": 1,
"score": 0.847,
"source": "personal",
"file": "memory/decisions.json",
"chunk": "Relevant text content...",
"metadata": {"date": "2025-01-15"}
}
],
"searched_collections": ["personal", "docs"],
"total_chunks_searched": 1847
}
```
## Search Strategy
1. **Start broad** - Use general terms first
2. **Refine if needed** - Add specific keywords if results aren't relevant
3. **Cross-reference** - When both personal and docs results appear, synthesize them
4. **Cite sources** - Include file paths and dates in your answers
## Example Workflow
User asks: "How should I configure ArgoCD sync?"
1. Search both indexes:
```bash
search.py "ArgoCD sync configuration"
```
2. If personal results exist, prioritize those (user's past decisions)
3. Supplement with docs results for official guidance
4. Synthesize answer:
> Based on your previous decision (decisions.json, 2025-01-15), you configured ArgoCD with auto-sync enabled but self-heal disabled. The ArgoCD docs recommend this for production environments where you want automatic deployment but manual intervention for drift correction.
## Maintenance
Indexes should be refreshed periodically:
- Personal: After significant state changes
- Docs: After tool version upgrades
A systemd timer can automate this (see design doc for setup).

View File

@@ -0,0 +1,14 @@
{
"sources": [
{
"id": "k0s",
"name": "k0s Documentation",
"type": "git",
"url": "https://github.com/k0sproject/k0s.git",
"path": "docs/",
"glob": "**/*.md",
"version": "main",
"last_indexed": "2026-01-04T23:27:40.175671"
}
]
}

View File

@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""
RAG Search - Add Documentation Source
Adds a new documentation source to the registry.
"""
import argparse
import json
import sys
from pathlib import Path
# Constants
SKILL_DIR = Path(__file__).parent.parent
SOURCES_FILE = SKILL_DIR / "references" / "sources.json"
def load_sources() -> list[dict]:
"""Load configured documentation sources."""
if not SOURCES_FILE.exists():
return []
with open(SOURCES_FILE) as f:
data = json.load(f)
return data.get("sources", [])
def save_sources(sources: list[dict]) -> None:
"""Save documentation sources."""
SOURCES_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(SOURCES_FILE, "w") as f:
json.dump({"sources": sources}, f, indent=2)
def add_source(
source_id: str,
name: str,
source_type: str,
url: str = None,
path: str = None,
glob: str = "**/*.md",
version: str = None,
base_url: str = None,
) -> dict:
"""
Add a new documentation source.
Args:
source_id: Unique identifier for the source
name: Human-readable name
source_type: "git" or "local"
url: Git repository URL (for git type)
path: Path within repo or local path
glob: File pattern to match
version: Git tag/branch (for git type)
base_url: Base URL for documentation links
Returns:
The created source configuration
"""
sources = load_sources()
# Check for existing source
existing = [s for s in sources if s["id"] == source_id]
if existing:
raise ValueError(f"Source already exists: {source_id}")
# Build source config
source = {
"id": source_id,
"name": name,
"type": source_type,
}
if source_type == "git":
if not url:
raise ValueError("Git sources require --url")
source["url"] = url
if version:
source["version"] = version
elif source_type == "local":
if not path:
raise ValueError("Local sources require --path")
source["path"] = str(Path(path).expanduser())
else:
raise ValueError(f"Unknown source type: {source_type}")
if path and source_type == "git":
source["path"] = path
source["glob"] = glob
if base_url:
source["base_url"] = base_url
sources.append(source)
save_sources(sources)
return source
def remove_source(source_id: str) -> bool:
"""Remove a documentation source."""
sources = load_sources()
original_count = len(sources)
sources = [s for s in sources if s["id"] != source_id]
if len(sources) == original_count:
return False
save_sources(sources)
return True
def main():
parser = argparse.ArgumentParser(
description="Add or manage documentation sources for RAG search",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Add k0s documentation from GitHub
%(prog)s --id k0s --name "k0s Documentation" --type git \\
--url "https://github.com/k0sproject/k0s.git" \\
--path "docs/" --version "v1.30.0"
# Add local documentation directory
%(prog)s --id internal --name "Internal Docs" --type local \\
--path "~/docs/internal" --glob "**/*.md"
# Remove a source
%(prog)s --remove k0s
# List sources
%(prog)s --list
"""
)
parser.add_argument("--id", help="Unique source identifier")
parser.add_argument("--name", help="Human-readable name")
parser.add_argument(
"--type", "-t",
choices=["git", "local"],
default="git",
help="Source type (default: git)"
)
parser.add_argument("--url", help="Git repository URL")
parser.add_argument("--path", help="Path within repo or local directory")
parser.add_argument(
"--glob", "-g",
default="**/*.md",
help="File pattern to match (default: **/*.md)"
)
parser.add_argument("--version", "-v", help="Git tag or branch")
parser.add_argument("--base-url", help="Base URL for documentation links")
parser.add_argument(
"--remove", "-r",
metavar="ID",
help="Remove a source by ID"
)
parser.add_argument(
"--list", "-l",
action="store_true",
help="List configured sources"
)
args = parser.parse_args()
if args.list:
sources = load_sources()
if sources:
print(json.dumps(sources, indent=2))
else:
print("No documentation sources configured")
return
if args.remove:
if remove_source(args.remove):
print(f"Removed source: {args.remove}")
else:
print(f"Source not found: {args.remove}", file=sys.stderr)
sys.exit(1)
return
# Adding a new source
if not args.id or not args.name:
parser.error("--id and --name are required when adding a source")
try:
source = add_source(
source_id=args.id,
name=args.name,
source_type=args.type,
url=args.url,
path=args.path,
glob=args.glob,
version=args.version,
base_url=args.base_url,
)
print(f"Added source: {args.id}")
print(json.dumps(source, indent=2))
print(f"\nTo index this source, run:")
print(f" index_docs.py --source {args.id}")
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,419 @@
#!/usr/bin/env python3
"""
RAG Search - Documentation Index Builder
Indexes external documentation sources for semantic search.
Supports git repos and local directories.
"""
import argparse
import json
import os
import re
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Generator, Optional
# Add venv site-packages to path
VENV_PATH = Path(__file__).parent.parent / "venv" / "lib" / "python3.13" / "site-packages"
if str(VENV_PATH) not in sys.path:
sys.path.insert(0, str(VENV_PATH))
import chromadb
from sentence_transformers import SentenceTransformer
# Constants
SKILL_DIR = Path(__file__).parent.parent
SOURCES_FILE = SKILL_DIR / "references" / "sources.json"
DATA_DIR = Path.home() / ".claude" / "data" / "rag-search"
CHROMA_DIR = DATA_DIR / "chroma"
DOCS_CACHE_DIR = DATA_DIR / "docs-cache"
MODEL_NAME = "all-MiniLM-L6-v2"
COLLECTION_NAME = "docs"
# Chunking parameters
CHUNK_SIZE = 500 # Target tokens (roughly 4 chars per token)
CHUNK_OVERLAP = 50
def load_sources() -> list[dict]:
"""Load configured documentation sources."""
if not SOURCES_FILE.exists():
return []
with open(SOURCES_FILE) as f:
data = json.load(f)
return data.get("sources", [])
def save_sources(sources: list[dict]) -> None:
"""Save documentation sources."""
SOURCES_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(SOURCES_FILE, "w") as f:
json.dump({"sources": sources}, f, indent=2)
def fetch_git_source(source: dict, quiet: bool = False) -> Optional[Path]:
"""
Clone or update a git repository.
Returns:
Path to the docs directory within the repo
"""
source_id = source["id"]
url = source["url"]
version = source.get("version", "HEAD")
doc_path = source.get("path", "")
cache_dir = DOCS_CACHE_DIR / source_id
if cache_dir.exists():
# Update existing repo
if not quiet:
print(f" Updating {source_id}...")
try:
subprocess.run(
["git", "fetch", "--all"],
cwd=cache_dir,
capture_output=True,
check=True
)
subprocess.run(
["git", "checkout", version],
cwd=cache_dir,
capture_output=True,
check=True
)
subprocess.run(
["git", "pull", "--ff-only"],
cwd=cache_dir,
capture_output=True,
check=False # May fail on tags
)
except subprocess.CalledProcessError as e:
print(f" Warning: Could not update {source_id}: {e}", file=sys.stderr)
else:
# Clone new repo
if not quiet:
print(f" Cloning {source_id}...")
cache_dir.parent.mkdir(parents=True, exist_ok=True)
try:
subprocess.run(
["git", "clone", "--depth", "1", url, str(cache_dir)],
capture_output=True,
check=True
)
if version != "HEAD":
subprocess.run(
["git", "fetch", "--depth", "1", "origin", version],
cwd=cache_dir,
capture_output=True,
check=True
)
subprocess.run(
["git", "checkout", version],
cwd=cache_dir,
capture_output=True,
check=True
)
except subprocess.CalledProcessError as e:
print(f" Error: Could not clone {source_id}: {e}", file=sys.stderr)
return None
docs_dir = cache_dir / doc_path if doc_path else cache_dir
return docs_dir if docs_dir.exists() else None
def chunk_markdown(content: str, file_path: str) -> Generator[tuple[str, dict], None, None]:
"""
Chunk markdown content for embedding.
Strategy:
- Split by headers to preserve context
- Chunk sections that are too long
- Preserve header hierarchy in metadata
"""
lines = content.split("\n")
current_chunk = []
current_headers = []
chunk_start_line = 0
def emit_chunk() -> Optional[tuple[str, dict]]:
if not current_chunk:
return None
text = "\n".join(current_chunk).strip()
if len(text) < 20:
return None
metadata = {
"file": file_path,
"headers": " > ".join(current_headers) if current_headers else ""
}
return (text, metadata)
for i, line in enumerate(lines):
# Check for header
header_match = re.match(r'^(#{1,6})\s+(.+)$', line)
if header_match:
# Emit current chunk before new header
chunk = emit_chunk()
if chunk:
yield chunk
current_chunk = []
# Update header hierarchy
level = len(header_match.group(1))
header_text = header_match.group(2).strip()
# Trim headers to current level
current_headers = current_headers[:level-1]
current_headers.append(header_text)
chunk_start_line = i
current_chunk.append(line)
# Check if chunk is getting too large (rough token estimate)
chunk_text = "\n".join(current_chunk)
if len(chunk_text) > CHUNK_SIZE * 4:
chunk = emit_chunk()
if chunk:
yield chunk
# Start new chunk with overlap
overlap_lines = current_chunk[-CHUNK_OVERLAP // 10:] if len(current_chunk) > CHUNK_OVERLAP // 10 else []
current_chunk = overlap_lines
# Emit final chunk
chunk = emit_chunk()
if chunk:
yield chunk
def index_source(
source: dict,
model: SentenceTransformer,
quiet: bool = False
) -> tuple[list[str], list[list[float]], list[dict], list[str]]:
"""
Index a single documentation source.
Returns:
(chunks, embeddings, metadatas, ids)
"""
source_id = source["id"]
source_type = source.get("type", "git")
glob_pattern = source.get("glob", "**/*.md")
if source_type == "git":
docs_dir = fetch_git_source(source, quiet=quiet)
if not docs_dir:
return [], [], [], []
elif source_type == "local":
docs_dir = Path(source["path"]).expanduser()
if not docs_dir.exists():
print(f" Warning: Local path does not exist: {docs_dir}", file=sys.stderr)
return [], [], [], []
else:
print(f" Warning: Unknown source type: {source_type}", file=sys.stderr)
return [], [], [], []
chunks = []
metadatas = []
ids = []
# Find and process files
files = list(docs_dir.glob(glob_pattern))
if not quiet:
print(f" Found {len(files)} files matching {glob_pattern}")
for file_path in files:
try:
content = file_path.read_text(encoding="utf-8", errors="ignore")
except IOError:
continue
rel_path = str(file_path.relative_to(docs_dir))
full_path = f"{source_id}/{rel_path}"
for chunk_text, metadata in chunk_markdown(content, full_path):
chunk_id = f"docs_{source_id}_{len(chunks)}"
chunks.append(chunk_text)
metadata["source_id"] = source_id
metadata["source_name"] = source.get("name", source_id)
if source.get("version"):
metadata["version"] = source["version"]
if source.get("base_url"):
metadata["url"] = source["base_url"]
metadatas.append(metadata)
ids.append(chunk_id)
if not quiet:
print(f" Indexed {len(chunks)} chunks from {source_id}")
return chunks, [], metadatas, ids
def index_docs(
source_id: Optional[str] = None,
all_sources: bool = False,
quiet: bool = False
) -> dict:
"""
Index documentation sources.
Args:
source_id: Index only this source
all_sources: Index all configured sources
quiet: Suppress progress output
Returns:
Summary statistics
"""
sources = load_sources()
if not sources:
return {"error": "No documentation sources configured"}
# Filter sources
if source_id:
sources = [s for s in sources if s["id"] == source_id]
if not sources:
return {"error": f"Source not found: {source_id}"}
elif not all_sources:
return {"error": "Specify --source <id> or --all"}
if not quiet:
print(f"Indexing {len(sources)} documentation source(s)")
# Initialize model and client
model = SentenceTransformer(MODEL_NAME)
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
# Get or create collection
try:
collection = client.get_collection(COLLECTION_NAME)
# If indexing all or specific source, we'll need to handle existing data
if all_sources:
client.delete_collection(COLLECTION_NAME)
collection = client.create_collection(
name=COLLECTION_NAME,
metadata={"description": "External documentation"}
)
except Exception:
collection = client.create_collection(
name=COLLECTION_NAME,
metadata={"description": "External documentation"}
)
# Process each source
all_chunks = []
all_metadatas = []
all_ids = []
for source in sources:
if not quiet:
print(f"\nProcessing: {source['name']}")
chunks, _, metadatas, ids = index_source(source, model, quiet=quiet)
all_chunks.extend(chunks)
all_metadatas.extend(metadatas)
all_ids.extend(ids)
# Update last_indexed timestamp
source["last_indexed"] = datetime.now().isoformat()
# Batch embed and add to collection
if all_chunks:
if not quiet:
print(f"\nEmbedding {len(all_chunks)} chunks...")
embeddings = model.encode(all_chunks, show_progress_bar=not quiet).tolist()
# Add in batches
batch_size = 100
for i in range(0, len(all_chunks), batch_size):
end_idx = min(i + batch_size, len(all_chunks))
collection.add(
documents=all_chunks[i:end_idx],
embeddings=embeddings[i:end_idx],
metadatas=all_metadatas[i:end_idx],
ids=all_ids[i:end_idx]
)
# Save updated sources with timestamps
all_sources = load_sources()
for source in sources:
for s in all_sources:
if s["id"] == source["id"]:
s["last_indexed"] = source["last_indexed"]
break
save_sources(all_sources)
stats = {
"collection": COLLECTION_NAME,
"sources_processed": len(sources),
"chunks_indexed": len(all_chunks),
"indexed_at": datetime.now().isoformat()
}
if not quiet:
print(f"\nIndexed {len(all_chunks)} chunks from {len(sources)} source(s)")
return stats
def main():
parser = argparse.ArgumentParser(
description="Index external documentation for RAG search"
)
parser.add_argument(
"--source", "-s",
help="Index only this source ID"
)
parser.add_argument(
"--all", "-a",
action="store_true",
dest="all_sources",
help="Index all configured sources"
)
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Suppress progress output"
)
parser.add_argument(
"--list", "-l",
action="store_true",
help="List configured sources"
)
parser.add_argument(
"--stats",
action="store_true",
help="Output stats as JSON"
)
args = parser.parse_args()
if args.list:
sources = load_sources()
if sources:
print(json.dumps(sources, indent=2))
else:
print("No documentation sources configured")
print(f"Add sources with: add_doc_source.py")
return
stats = index_docs(
source_id=args.source,
all_sources=args.all_sources,
quiet=args.quiet
)
if args.stats or "error" in stats:
print(json.dumps(stats, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
RAG Search - Personal Index Builder
Indexes ~/.claude/state files for semantic search.
Chunks JSON files by key for optimal retrieval.
"""
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
from typing import Generator
# Add venv site-packages to path
VENV_PATH = Path(__file__).parent.parent / "venv" / "lib" / "python3.13" / "site-packages"
if str(VENV_PATH) not in sys.path:
sys.path.insert(0, str(VENV_PATH))
import chromadb
from sentence_transformers import SentenceTransformer
# Constants
STATE_DIR = Path.home() / ".claude" / "state"
DATA_DIR = Path.home() / ".claude" / "data" / "rag-search"
CHROMA_DIR = DATA_DIR / "chroma"
MODEL_NAME = "all-MiniLM-L6-v2"
COLLECTION_NAME = "personal"
def chunk_json_file(file_path: Path) -> Generator[tuple[str, dict], None, None]:
"""
Chunk a JSON file into searchable segments.
Strategy:
- Arrays: Each item becomes a chunk
- Objects with arrays: Each array item with parent context
- Nested objects: Flatten with path prefix
Yields:
(chunk_text, metadata) tuples
"""
try:
with open(file_path) as f:
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f" Warning: Could not parse {file_path}: {e}", file=sys.stderr)
return
rel_path = str(file_path.relative_to(STATE_DIR))
base_metadata = {"file": rel_path}
def process_item(item: dict, context: str = "") -> Generator[tuple[str, dict], None, None]:
"""Process a single item from JSON structure."""
if isinstance(item, dict):
# Check for common patterns in our state files
# Memory items (decisions, preferences, facts, projects)
if "content" in item:
text_parts = []
if context:
text_parts.append(f"[{context}]")
text_parts.append(item.get("content", ""))
if item.get("context"):
text_parts.append(f"Context: {item['context']}")
if item.get("rationale"):
text_parts.append(f"Rationale: {item['rationale']}")
metadata = {**base_metadata}
if item.get("date"):
metadata["date"] = item["date"]
if item.get("id"):
metadata["id"] = item["id"]
if item.get("status"):
metadata["status"] = item["status"]
yield (" ".join(text_parts), metadata)
return
# General instructions (memory)
if "instruction" in item:
text_parts = [item["instruction"]]
metadata = {**base_metadata}
if item.get("added"):
metadata["date"] = item["added"]
if item.get("status"):
metadata["status"] = item["status"]
yield (" ".join(text_parts), metadata)
return
# Knowledge base entries
if "fact" in item or "answer" in item:
text = item.get("fact") or item.get("answer", "")
if item.get("question"):
text = f"Q: {item['question']} A: {text}"
metadata = {**base_metadata}
if item.get("category"):
metadata["category"] = item["category"]
yield (text, metadata)
return
# Component registry entries
if "name" in item and "description" in item:
text = f"{item['name']}: {item['description']}"
if item.get("triggers"):
text += f" Triggers: {', '.join(item['triggers'])}"
metadata = {**base_metadata, "type": item.get("type", "unknown")}
yield (text, metadata)
return
# Future considerations
if "id" in item and "title" in item:
text = f"{item.get('id', '')}: {item['title']}"
if item.get("description"):
text += f" - {item['description']}"
if item.get("rationale"):
text += f" Rationale: {item['rationale']}"
metadata = {**base_metadata}
if item.get("date_added"):
metadata["date"] = item["date_added"]
if item.get("status"):
metadata["status"] = item["status"]
yield (text, metadata)
return
# System instructions - processes
if "process" in item or "name" in item:
parts = []
if item.get("name"):
parts.append(item["name"])
if item.get("description"):
parts.append(item["description"])
if item.get("steps"):
parts.append("Steps: " + " ".join(item["steps"]))
if parts:
yield (" - ".join(parts), {**base_metadata})
return
# Fallback: stringify the whole object
text = json.dumps(item, indent=None)
if len(text) > 50: # Only index if substantial
yield (text[:1000], {**base_metadata}) # Truncate very long items
elif isinstance(item, str) and len(item) > 20:
yield (item, {**base_metadata})
# Process top-level structure
if isinstance(data, list):
for item in data:
yield from process_item(item)
elif isinstance(data, dict):
# Handle nested arrays within objects
for key, value in data.items():
if isinstance(value, list):
for item in value:
yield from process_item(item, context=key)
elif isinstance(value, dict):
yield from process_item(value, context=key)
elif isinstance(value, str) and len(value) > 20:
yield (f"{key}: {value}", {**base_metadata})
def find_json_files() -> list[Path]:
"""Find all JSON files in the state directory."""
files = []
for pattern in ["*.json", "**/*.json"]:
files.extend(STATE_DIR.glob(pattern))
return sorted(set(files))
def index_personal(quiet: bool = False, force: bool = False) -> dict:
"""
Index all personal state files.
Args:
quiet: Suppress progress output
force: Force reindex even if already exists
Returns:
Summary statistics
"""
if not quiet:
print(f"Indexing personal state from {STATE_DIR}")
# Initialize model and client
model = SentenceTransformer(MODEL_NAME)
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
# Delete and recreate collection for clean reindex
try:
client.delete_collection(COLLECTION_NAME)
except Exception:
pass
collection = client.create_collection(
name=COLLECTION_NAME,
metadata={"description": "Personal state files from ~/.claude/state"}
)
# Find and process files
files = find_json_files()
if not quiet:
print(f"Found {len(files)} JSON files")
total_chunks = 0
chunks = []
metadatas = []
ids = []
for file_path in files:
if not quiet:
print(f" Processing: {file_path.relative_to(STATE_DIR)}")
for chunk_text, metadata in chunk_json_file(file_path):
# Skip empty or very short chunks
if not chunk_text or len(chunk_text.strip()) < 10:
continue
chunk_id = f"personal_{total_chunks}"
chunks.append(chunk_text)
metadatas.append(metadata)
ids.append(chunk_id)
total_chunks += 1
# Batch embed and add to collection
if chunks:
if not quiet:
print(f"Embedding {len(chunks)} chunks...")
embeddings = model.encode(chunks, show_progress_bar=not quiet).tolist()
# Add in batches (ChromaDB has limits)
batch_size = 100
for i in range(0, len(chunks), batch_size):
end_idx = min(i + batch_size, len(chunks))
collection.add(
documents=chunks[i:end_idx],
embeddings=embeddings[i:end_idx],
metadatas=metadatas[i:end_idx],
ids=ids[i:end_idx]
)
stats = {
"collection": COLLECTION_NAME,
"files_processed": len(files),
"chunks_indexed": total_chunks,
"indexed_at": datetime.now().isoformat()
}
if not quiet:
print(f"\nIndexed {total_chunks} chunks from {len(files)} files")
return stats
def main():
parser = argparse.ArgumentParser(
description="Index personal state files for RAG search"
)
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Suppress progress output"
)
parser.add_argument(
"--force", "-f",
action="store_true",
help="Force reindex even if already indexed"
)
parser.add_argument(
"--stats",
action="store_true",
help="Output stats as JSON"
)
args = parser.parse_args()
stats = index_personal(quiet=args.quiet, force=args.force)
if args.stats:
print(json.dumps(stats, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,184 @@
#!/usr/bin/env python3
"""
RAG Search - Main search entry point
Searches personal and/or docs indexes for semantically similar content.
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Optional
# Add venv site-packages to path
VENV_PATH = Path(__file__).parent.parent / "venv" / "lib" / "python3.13" / "site-packages"
if str(VENV_PATH) not in sys.path:
sys.path.insert(0, str(VENV_PATH))
import chromadb
from sentence_transformers import SentenceTransformer
# Constants
DATA_DIR = Path.home() / ".claude" / "data" / "rag-search"
CHROMA_DIR = DATA_DIR / "chroma"
MODEL_NAME = "all-MiniLM-L6-v2"
DEFAULT_TOP_K = 5
# Lazy-loaded globals
_model: Optional[SentenceTransformer] = None
_client: Optional[chromadb.PersistentClient] = None
def get_model() -> SentenceTransformer:
"""Lazy-load the embedding model."""
global _model
if _model is None:
_model = SentenceTransformer(MODEL_NAME)
return _model
def get_client() -> chromadb.PersistentClient:
"""Lazy-load the ChromaDB client."""
global _client
if _client is None:
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
_client = chromadb.PersistentClient(path=str(CHROMA_DIR))
return _client
def search(
query: str,
index: Optional[str] = None,
top_k: int = DEFAULT_TOP_K,
) -> dict:
"""
Search for semantically similar content.
Args:
query: The search query
index: Which index to search ("personal", "docs", or None for both)
top_k: Number of results to return per collection
Returns:
dict with query, results, and metadata
"""
client = get_client()
model = get_model()
# Embed the query
query_embedding = model.encode(query).tolist()
# Determine which collections to search
collections_to_search = []
if index is None or index == "personal":
try:
collections_to_search.append(("personal", client.get_collection("personal")))
except Exception:
pass # Collection doesn't exist
if index is None or index == "docs":
try:
collections_to_search.append(("docs", client.get_collection("docs")))
except Exception:
pass # Collection doesn't exist
if not collections_to_search:
return {
"query": query,
"results": [],
"searched_collections": [],
"total_chunks_searched": 0,
"error": f"No collections found for index: {index or 'any'}"
}
# Search each collection
all_results = []
total_chunks = 0
searched_collections = []
for coll_name, collection in collections_to_search:
searched_collections.append(coll_name)
count = collection.count()
total_chunks += count
if count == 0:
continue
results = collection.query(
query_embeddings=[query_embedding],
n_results=min(top_k, count),
include=["documents", "metadatas", "distances"]
)
# Process results
if results["documents"] and results["documents"][0]:
for i, (doc, metadata, distance) in enumerate(zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
)):
# Convert distance to similarity score (cosine distance to similarity)
score = 1 - (distance / 2) # Normalized for cosine distance
all_results.append({
"source": coll_name,
"file": metadata.get("file", "unknown"),
"chunk": doc,
"score": round(score, 3),
"metadata": {k: v for k, v in metadata.items() if k != "file"}
})
# Sort by score and add ranks
all_results.sort(key=lambda x: x["score"], reverse=True)
for i, result in enumerate(all_results[:top_k]):
result["rank"] = i + 1
return {
"query": query,
"results": all_results[:top_k],
"searched_collections": searched_collections,
"total_chunks_searched": total_chunks
}
def main():
parser = argparse.ArgumentParser(
description="Search the RAG index for relevant content",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s "how did I configure ArgoCD sync?"
%(prog)s --index personal "past decisions about caching"
%(prog)s --index docs "k0s node maintenance"
%(prog)s --top-k 10 "prometheus alerting rules"
"""
)
parser.add_argument("query", help="Search query")
parser.add_argument(
"--index", "-i",
choices=["personal", "docs"],
help="Search only this index (default: both)"
)
parser.add_argument(
"--top-k", "-k",
type=int,
default=DEFAULT_TOP_K,
help=f"Number of results to return (default: {DEFAULT_TOP_K})"
)
parser.add_argument(
"--raw",
action="store_true",
help="Output raw JSON (default: formatted)"
)
args = parser.parse_args()
results = search(args.query, args.index, args.top_k)
if args.raw:
print(json.dumps(results))
else:
print(json.dumps(results, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,230 @@
#!/usr/bin/env python3
"""
RAG Search - Test Suite
Tests all components of the RAG search skill.
"""
import json
import subprocess
import sys
from pathlib import Path
# Constants
SKILL_DIR = Path(__file__).parent.parent
SCRIPTS_DIR = SKILL_DIR / "scripts"
VENV_PYTHON = SKILL_DIR / "venv" / "bin" / "python"
DATA_DIR = Path.home() / ".claude" / "data" / "rag-search"
def run_script(script_name: str, args: list[str] = None) -> tuple[int, str, str]:
"""Run a script and return (returncode, stdout, stderr)."""
cmd = [str(VENV_PYTHON), str(SCRIPTS_DIR / script_name)]
if args:
cmd.extend(args)
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode, result.stdout, result.stderr
def test_chromadb_embeddings():
"""Test 1: ChromaDB + embeddings working."""
print("Test 1: ChromaDB + embeddings...")
# Add venv to path and test imports
venv_path = SKILL_DIR / "venv" / "lib" / "python3.13" / "site-packages"
sys.path.insert(0, str(venv_path))
try:
import chromadb
from sentence_transformers import SentenceTransformer
# Test ChromaDB
client = chromadb.PersistentClient(path=str(DATA_DIR / "chroma"))
assert client is not None, "Failed to create ChromaDB client"
# Test embedding model
model = SentenceTransformer("all-MiniLM-L6-v2")
embedding = model.encode("test query")
assert len(embedding) == 384, f"Expected 384 dimensions, got {len(embedding)}"
print(" PASS: ChromaDB and embeddings working")
return True
except Exception as e:
print(f" FAIL: {e}")
return False
def test_personal_index():
"""Test 2: Personal index populated from ~/.claude/state."""
print("Test 2: Personal index populated...")
# Check if collection exists and has data
venv_path = SKILL_DIR / "venv" / "lib" / "python3.13" / "site-packages"
if str(venv_path) not in sys.path:
sys.path.insert(0, str(venv_path))
try:
import chromadb
client = chromadb.PersistentClient(path=str(DATA_DIR / "chroma"))
collection = client.get_collection("personal")
count = collection.count()
assert count > 0, f"Personal collection is empty (count={count})"
print(f" PASS: Personal index has {count} chunks")
return True
except Exception as e:
print(f" FAIL: {e}")
return False
def test_docs_index():
"""Test 3: At least one external doc source indexed."""
print("Test 3: External docs indexed...")
# Check if collection exists and has data
venv_path = SKILL_DIR / "venv" / "lib" / "python3.13" / "site-packages"
if str(venv_path) not in sys.path:
sys.path.insert(0, str(venv_path))
try:
import chromadb
client = chromadb.PersistentClient(path=str(DATA_DIR / "chroma"))
collection = client.get_collection("docs")
count = collection.count()
assert count > 0, f"Docs collection is empty (count={count})"
# Also verify sources.json has at least one source
sources_file = SKILL_DIR / "references" / "sources.json"
with open(sources_file) as f:
sources = json.load(f)
assert len(sources.get("sources", [])) > 0, "No sources configured"
print(f" PASS: Docs index has {count} chunks from {len(sources['sources'])} source(s)")
return True
except Exception as e:
print(f" FAIL: {e}")
return False
def test_search_returns_results():
"""Test 4: search.py returns relevant results."""
print("Test 4: Search returns relevant results...")
# Test personal search
returncode, stdout, stderr = run_script("search.py", ["--index", "personal", "decisions"])
if returncode != 0:
print(f" FAIL: Personal search failed: {stderr}")
return False
try:
result = json.loads(stdout)
personal_results = result.get("results", [])
if not personal_results:
print(" WARN: No personal results found (may be expected if state is minimal)")
except json.JSONDecodeError:
print(f" FAIL: Invalid JSON output: {stdout}")
return False
# Test docs search
returncode, stdout, stderr = run_script("search.py", ["--index", "docs", "kubernetes"])
if returncode != 0:
print(f" FAIL: Docs search failed: {stderr}")
return False
try:
result = json.loads(stdout)
docs_results = result.get("results", [])
if not docs_results:
print(" FAIL: No docs results found for 'kubernetes'")
return False
except json.JSONDecodeError:
print(f" FAIL: Invalid JSON output: {stdout}")
return False
# Test combined search
returncode, stdout, stderr = run_script("search.py", ["configuration"])
if returncode != 0:
print(f" FAIL: Combined search failed: {stderr}")
return False
try:
result = json.loads(stdout)
assert "query" in result, "Missing 'query' in output"
assert "results" in result, "Missing 'results' in output"
assert "searched_collections" in result, "Missing 'searched_collections'"
assert len(result["searched_collections"]) == 2, "Should search both collections"
except json.JSONDecodeError:
print(f" FAIL: Invalid JSON output: {stdout}")
return False
print(f" PASS: Search returns properly formatted results")
return True
def test_skill_structure():
"""Test 5: All required files exist."""
print("Test 5: Skill structure complete...")
required_files = [
SKILL_DIR / "SKILL.md",
SCRIPTS_DIR / "search.py",
SCRIPTS_DIR / "index_personal.py",
SCRIPTS_DIR / "index_docs.py",
SCRIPTS_DIR / "add_doc_source.py",
SKILL_DIR / "references" / "sources.json",
]
missing = []
for f in required_files:
if not f.exists():
missing.append(str(f.relative_to(SKILL_DIR)))
if missing:
print(f" FAIL: Missing files: {', '.join(missing)}")
return False
print(" PASS: All required files exist")
return True
def main():
print("=" * 60)
print("RAG Search Test Suite")
print("=" * 60)
print()
tests = [
test_chromadb_embeddings,
test_personal_index,
test_docs_index,
test_search_returns_results,
test_skill_structure,
]
results = []
for test in tests:
results.append(test())
print()
print("=" * 60)
print("Summary")
print("=" * 60)
passed = sum(results)
total = len(results)
print(f"Passed: {passed}/{total}")
if passed == total:
print("\nAll tests passed!")
return 0
else:
print(f"\n{total - passed} test(s) failed")
return 1
if __name__ == "__main__":
sys.exit(main())