Files
claude-code/hooks/scripts/summarize-transcript.py
OpenCode Test 630893f047 Add conditional RAG reindex after session summarization
When summarize-transcript.py extracts items to memory files, it now
triggers index_personal.py to update the RAG search index. Only runs
when items were actually added (total_added > 0) to avoid unnecessary
reindexing on trivial sessions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 10:32:04 -08:00

407 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Summarize a Claude Code session transcript and extract key information to memory.
This script:
1. Loads the transcript from Claude's storage
2. Checks if session is substantive (>= 3 user messages)
3. Extracts facts/paths via heuristics
4. Uses Claude CLI (with subscription auth) for decisions/preferences if substantive
5. Updates memory files and marks session as summarized
"""
import argparse
import json
import os
import re
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
# Paths
STATE_DIR = Path.home() / ".claude/state/personal-assistant"
MEMORY_DIR = STATE_DIR / "memory"
HISTORY_INDEX = STATE_DIR / "history/index.json"
# Memory files
MEMORY_FILES = {
"decisions": MEMORY_DIR / "decisions.json",
"preferences": MEMORY_DIR / "preferences.json",
"projects": MEMORY_DIR / "projects.json",
"facts": MEMORY_DIR / "facts.json",
}
# Minimum threshold for substantive sessions
MIN_USER_MESSAGES = 3
def log(msg: str) -> None:
"""Log with timestamp."""
print(f"[{datetime.now().isoformat()}] {msg}", file=sys.stderr)
def load_transcript(path: str) -> list[dict]:
"""Load transcript from jsonl file."""
messages = []
try:
with open(path, "r") as f:
for line in f:
line = line.strip()
if line:
messages.append(json.loads(line))
except Exception as e:
log(f"Error loading transcript: {e}")
return messages
def count_user_messages(transcript: list[dict]) -> int:
"""Count the number of user messages in transcript."""
count = 0
for entry in transcript:
# Claude Code format: type is "user" or "assistant" at top level
if entry.get("type") == "user":
count += 1
return count
def extract_conversation_text(transcript: list[dict]) -> str:
"""Extract readable conversation text from transcript."""
parts = []
for entry in transcript:
entry_type = entry.get("type", "")
# Skip non-message entries (like queue-operation)
if entry_type not in ("user", "assistant"):
continue
message = entry.get("message", {})
if not isinstance(message, dict):
continue
role = message.get("role", entry_type)
content = message.get("content", "")
# Handle different content formats
if isinstance(content, list):
# Assistant messages have content as array of blocks
text_parts = []
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_use":
text_parts.append(f"[Tool: {block.get('name', 'unknown')}]")
elif isinstance(block, str):
text_parts.append(block)
content = "\n".join(text_parts)
elif isinstance(content, str):
# User messages have content as string
pass
else:
continue
if content:
parts.append(f"[{role}]: {content[:2000]}") # Truncate long messages
return "\n\n".join(parts)
def heuristic_extraction(transcript: list[dict]) -> dict[str, list[dict]]:
"""Extract simple facts and paths using heuristics."""
results = {"projects": [], "facts": []}
conversation = extract_conversation_text(transcript)
# Extract file paths mentioned
path_pattern = r'(?:/[\w.-]+)+(?:/[\w.-]*)?'
paths = set(re.findall(path_pattern, conversation))
# Filter to likely project paths
project_paths = set()
for p in paths:
if any(x in p for x in ['/home/', '/Users/', '/.claude/', '/projects/']):
if not any(x in p for x in ['/proc/', '/sys/', '/dev/', '/tmp/']):
project_paths.add(p)
# Add unique project paths as context
for path in list(project_paths)[:5]: # Limit to 5 paths
results["projects"].append({
"content": f"Worked with path: {path}",
"context": "File path referenced in session"
})
# Extract tool/environment facts
tool_patterns = [
(r'using\s+([\w-]+)\s+version\s+([\d.]+)', "Tool version: {0} {1}"),
(r'(python|node|npm|pip)\s+.*?([\d.]+)', "Runtime: {0} {1}"),
]
for pattern, template in tool_patterns:
matches = re.findall(pattern, conversation, re.IGNORECASE)
for match in matches[:2]: # Limit matches
results["facts"].append({
"content": template.format(*match),
"context": "Environment fact from session"
})
return results
def llm_extraction(conversation_text: str, session_id: str) -> dict[str, list[dict]]:
"""Use Claude CLI to extract decisions and preferences."""
results = {"decisions": [], "preferences": []}
# Check if claude CLI is available
claude_path = subprocess.run(
["which", "claude"], capture_output=True, text=True
).stdout.strip()
if not claude_path:
log("Claude CLI not found, skipping LLM extraction")
return results
prompt = f"""Analyze this conversation excerpt and extract key information.
CONVERSATION:
{conversation_text[:15000]}
Extract and return a JSON object with:
1. "decisions": List of decisions made (choices, directions taken, approaches selected)
2. "preferences": List of user preferences learned (likes, dislikes, preferred approaches)
For each item include:
- "content": Brief description (1 sentence)
- "context": Why this matters or additional context
Only include genuinely significant items. Skip trivial or obvious things.
Return valid JSON only, no markdown formatting.
Example format:
{{"decisions": [{{"content": "Use PostgreSQL for the database", "context": "Chosen for JSONB support"}}], "preferences": [{{"content": "Prefers explicit type annotations", "context": "For code clarity"}}]}}"""
try:
# Use claude CLI in print mode with haiku for cost efficiency
result = subprocess.run(
[
claude_path, "-p",
"--model", "haiku",
"--no-session-persistence",
prompt
],
capture_output=True,
text=True,
timeout=60,
cwd=str(Path.home()) # Run from home to avoid project context
)
if result.returncode != 0:
log(f"Claude CLI failed: {result.stderr[:500]}")
return results
response_text = result.stdout.strip()
# Try to extract JSON from response
try:
# Handle potential markdown code blocks
if "```" in response_text:
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL)
if json_match:
response_text = json_match.group(1)
data = json.loads(response_text)
for key in ["decisions", "preferences"]:
if key in data and isinstance(data[key], list):
for item in data[key][:5]: # Limit to 5 per category
if isinstance(item, dict) and "content" in item:
results[key].append({
"content": item["content"],
"context": item.get("context", "")
})
except json.JSONDecodeError as e:
log(f"Failed to parse LLM response as JSON: {e}")
log(f"Response was: {response_text[:500]}")
except subprocess.TimeoutExpired:
log("Claude CLI timed out")
except Exception as e:
log(f"LLM extraction error: {e}")
return results
def load_memory_file(path: Path) -> dict:
"""Load a memory file, creating default structure if needed."""
if path.exists():
try:
with open(path) as f:
return json.load(f)
except json.JSONDecodeError:
pass
# Default structure
return {
"version": "1.0",
"description": f"{path.stem.title()} extracted from sessions",
"items": []
}
def save_memory_file(path: Path, data: dict) -> None:
"""Save a memory file."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2)
def add_to_memory(category: str, items: list[dict], session_id: str) -> int:
"""Add items to a memory category. Returns count of items added."""
if not items:
return 0
path = MEMORY_FILES.get(category)
if not path:
return 0
data = load_memory_file(path)
today = datetime.now().strftime("%Y-%m-%d")
# Check for duplicates based on content
existing_content = {item.get("content", "").lower() for item in data.get("items", [])}
added = 0
for item in items:
content = item.get("content", "")
if content.lower() not in existing_content:
data["items"].append({
"id": str(uuid.uuid4()),
"date": today,
"content": content,
"context": item.get("context", ""),
"session": session_id
})
existing_content.add(content.lower())
added += 1
if added > 0:
save_memory_file(path, data)
return added
def update_history_index(session_id: str, transcript_path: str, topics: list[str]) -> None:
"""Mark session as summarized in history index."""
if not HISTORY_INDEX.exists():
log(f"History index not found: {HISTORY_INDEX}")
return
try:
with open(HISTORY_INDEX) as f:
data = json.load(f)
# Find and update the session
for session in data.get("sessions", []):
if session.get("id") == session_id:
session["summarized"] = True
session["transcript_path"] = transcript_path
session["topics"] = topics[:5] # Limit topics
session["summarized_at"] = datetime.now().isoformat()
break
with open(HISTORY_INDEX, "w") as f:
json.dump(data, f, indent=2)
log(f"Updated history index for session {session_id}")
except Exception as e:
log(f"Error updating history index: {e}")
def main():
parser = argparse.ArgumentParser(description="Summarize a Claude Code session")
parser.add_argument("--session-id", required=True, help="Session ID")
parser.add_argument("--transcript", required=True, help="Path to transcript file")
parser.add_argument("--reason", default="", help="Session end reason")
args = parser.parse_args()
log(f"Starting summarization for session {args.session_id}")
# Load transcript
transcript = load_transcript(args.transcript)
if not transcript:
log("Empty or invalid transcript, skipping")
return
# Check threshold
user_msg_count = count_user_messages(transcript)
log(f"Found {user_msg_count} user messages")
if user_msg_count < MIN_USER_MESSAGES:
log(f"Session below threshold ({MIN_USER_MESSAGES}), marking as summarized without extraction")
update_history_index(args.session_id, args.transcript, ["trivial"])
return
# Extract conversation text
conversation_text = extract_conversation_text(transcript)
# Heuristic extraction (always run)
log("Running heuristic extraction...")
heuristic_results = heuristic_extraction(transcript)
# LLM extraction (for substantive sessions)
log("Running LLM extraction...")
llm_results = llm_extraction(conversation_text, args.session_id)
# Combine results
all_results = {
"decisions": llm_results.get("decisions", []),
"preferences": llm_results.get("preferences", []),
"projects": heuristic_results.get("projects", []),
"facts": heuristic_results.get("facts", []),
}
# Save to memory files
total_added = 0
topics = []
for category, items in all_results.items():
count = add_to_memory(category, items, args.session_id)
total_added += count
if count > 0:
topics.append(category)
log(f"Added {count} items to {category}")
# Update history index
update_history_index(args.session_id, args.transcript, topics)
log(f"Summarization complete: {total_added} total items added")
# Reindex RAG if we added items
if total_added > 0:
log("Triggering RAG reindex...")
try:
reindex_result = subprocess.run(
[
str(Path.home() / ".claude/skills/rag-search/venv/bin/python"),
str(Path.home() / ".claude/skills/rag-search/scripts/index_personal.py"),
"--quiet"
],
capture_output=True,
text=True,
timeout=120
)
if reindex_result.returncode == 0:
log("RAG reindex completed successfully")
else:
log(f"RAG reindex failed: {reindex_result.stderr[:200]}")
except subprocess.TimeoutExpired:
log("RAG reindex timed out after 120s")
except Exception as e:
log(f"RAG reindex error: {e}")
if __name__ == "__main__":
main()