#!/usr/bin/env python3 """ Summarize a Claude Code session transcript and extract key information to memory. This script: 1. Loads the transcript from Claude's storage 2. Checks if session is substantive (>= 3 user messages) 3. Extracts facts/paths via heuristics 4. Uses Claude CLI (with subscription auth) for decisions/preferences if substantive 5. Updates memory files and marks session as summarized """ import argparse import json import os import re import subprocess import sys import uuid from datetime import datetime from pathlib import Path from typing import Any # Paths STATE_DIR = Path.home() / ".claude/state/personal-assistant" MEMORY_DIR = STATE_DIR / "memory" HISTORY_INDEX = STATE_DIR / "history/index.json" # Memory files MEMORY_FILES = { "decisions": MEMORY_DIR / "decisions.json", "preferences": MEMORY_DIR / "preferences.json", "projects": MEMORY_DIR / "projects.json", "facts": MEMORY_DIR / "facts.json", } # Minimum threshold for substantive sessions MIN_USER_MESSAGES = 3 def log(msg: str) -> None: """Log with timestamp.""" print(f"[{datetime.now().isoformat()}] {msg}", file=sys.stderr) def load_transcript(path: str) -> list[dict]: """Load transcript from jsonl file.""" messages = [] try: with open(path, "r") as f: for line in f: line = line.strip() if line: messages.append(json.loads(line)) except Exception as e: log(f"Error loading transcript: {e}") return messages def count_user_messages(transcript: list[dict]) -> int: """Count the number of user messages in transcript.""" count = 0 for entry in transcript: # Claude Code format: type is "user" or "assistant" at top level if entry.get("type") == "user": count += 1 return count def extract_conversation_text(transcript: list[dict]) -> str: """Extract readable conversation text from transcript.""" parts = [] for entry in transcript: entry_type = entry.get("type", "") # Skip non-message entries (like queue-operation) if entry_type not in ("user", "assistant"): continue message = entry.get("message", {}) if not isinstance(message, dict): continue role = message.get("role", entry_type) content = message.get("content", "") # Handle different content formats if isinstance(content, list): # Assistant messages have content as array of blocks text_parts = [] for block in content: if isinstance(block, dict): if block.get("type") == "text": text_parts.append(block.get("text", "")) elif block.get("type") == "tool_use": text_parts.append(f"[Tool: {block.get('name', 'unknown')}]") elif isinstance(block, str): text_parts.append(block) content = "\n".join(text_parts) elif isinstance(content, str): # User messages have content as string pass else: continue if content: parts.append(f"[{role}]: {content[:2000]}") # Truncate long messages return "\n\n".join(parts) def heuristic_extraction(transcript: list[dict]) -> dict[str, list[dict]]: """Extract simple facts and paths using heuristics.""" results = {"projects": [], "facts": []} conversation = extract_conversation_text(transcript) # Extract file paths mentioned path_pattern = r'(?:/[\w.-]+)+(?:/[\w.-]*)?' paths = set(re.findall(path_pattern, conversation)) # Filter to likely project paths project_paths = set() for p in paths: if any(x in p for x in ['/home/', '/Users/', '/.claude/', '/projects/']): if not any(x in p for x in ['/proc/', '/sys/', '/dev/', '/tmp/']): project_paths.add(p) # Add unique project paths as context for path in list(project_paths)[:5]: # Limit to 5 paths results["projects"].append({ "content": f"Worked with path: {path}", "context": "File path referenced in session" }) # Extract tool/environment facts tool_patterns = [ (r'using\s+([\w-]+)\s+version\s+([\d.]+)', "Tool version: {0} {1}"), (r'(python|node|npm|pip)\s+.*?([\d.]+)', "Runtime: {0} {1}"), ] for pattern, template in tool_patterns: matches = re.findall(pattern, conversation, re.IGNORECASE) for match in matches[:2]: # Limit matches results["facts"].append({ "content": template.format(*match), "context": "Environment fact from session" }) return results def llm_extraction(conversation_text: str, session_id: str) -> dict[str, list[dict]]: """Use Claude CLI to extract decisions and preferences.""" results = {"decisions": [], "preferences": []} # Check if claude CLI is available claude_path = subprocess.run( ["which", "claude"], capture_output=True, text=True ).stdout.strip() if not claude_path: log("Claude CLI not found, skipping LLM extraction") return results prompt = f"""Analyze this conversation excerpt and extract key information. CONVERSATION: {conversation_text[:15000]} Extract and return a JSON object with: 1. "decisions": List of decisions made (choices, directions taken, approaches selected) 2. "preferences": List of user preferences learned (likes, dislikes, preferred approaches) For each item include: - "content": Brief description (1 sentence) - "context": Why this matters or additional context Only include genuinely significant items. Skip trivial or obvious things. Return valid JSON only, no markdown formatting. Example format: {{"decisions": [{{"content": "Use PostgreSQL for the database", "context": "Chosen for JSONB support"}}], "preferences": [{{"content": "Prefers explicit type annotations", "context": "For code clarity"}}]}}""" try: # Use claude CLI in print mode with haiku for cost efficiency result = subprocess.run( [ claude_path, "-p", "--model", "haiku", "--no-session-persistence", prompt ], capture_output=True, text=True, timeout=60, cwd=str(Path.home()) # Run from home to avoid project context ) if result.returncode != 0: log(f"Claude CLI failed: {result.stderr[:500]}") return results response_text = result.stdout.strip() # Try to extract JSON from response try: # Handle potential markdown code blocks if "```" in response_text: json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL) if json_match: response_text = json_match.group(1) data = json.loads(response_text) for key in ["decisions", "preferences"]: if key in data and isinstance(data[key], list): for item in data[key][:5]: # Limit to 5 per category if isinstance(item, dict) and "content" in item: results[key].append({ "content": item["content"], "context": item.get("context", "") }) except json.JSONDecodeError as e: log(f"Failed to parse LLM response as JSON: {e}") log(f"Response was: {response_text[:500]}") except subprocess.TimeoutExpired: log("Claude CLI timed out") except Exception as e: log(f"LLM extraction error: {e}") return results def load_memory_file(path: Path) -> dict: """Load a memory file, creating default structure if needed.""" if path.exists(): try: with open(path) as f: return json.load(f) except json.JSONDecodeError: pass # Default structure return { "version": "1.0", "description": f"{path.stem.title()} extracted from sessions", "items": [] } def save_memory_file(path: Path, data: dict) -> None: """Save a memory file.""" path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump(data, f, indent=2) def add_to_memory(category: str, items: list[dict], session_id: str) -> int: """Add items to a memory category. Returns count of items added.""" if not items: return 0 path = MEMORY_FILES.get(category) if not path: return 0 data = load_memory_file(path) today = datetime.now().strftime("%Y-%m-%d") # Check for duplicates based on content existing_content = {item.get("content", "").lower() for item in data.get("items", [])} added = 0 for item in items: content = item.get("content", "") if content.lower() not in existing_content: data["items"].append({ "id": str(uuid.uuid4()), "date": today, "content": content, "context": item.get("context", ""), "session": session_id }) existing_content.add(content.lower()) added += 1 if added > 0: save_memory_file(path, data) return added def update_history_index(session_id: str, transcript_path: str, topics: list[str]) -> None: """Mark session as summarized in history index.""" if not HISTORY_INDEX.exists(): log(f"History index not found: {HISTORY_INDEX}") return try: with open(HISTORY_INDEX) as f: data = json.load(f) # Find and update the session for session in data.get("sessions", []): if session.get("id") == session_id: session["summarized"] = True session["transcript_path"] = transcript_path session["topics"] = topics[:5] # Limit topics session["summarized_at"] = datetime.now().isoformat() break with open(HISTORY_INDEX, "w") as f: json.dump(data, f, indent=2) log(f"Updated history index for session {session_id}") except Exception as e: log(f"Error updating history index: {e}") def main(): parser = argparse.ArgumentParser(description="Summarize a Claude Code session") parser.add_argument("--session-id", required=True, help="Session ID") parser.add_argument("--transcript", required=True, help="Path to transcript file") parser.add_argument("--reason", default="", help="Session end reason") args = parser.parse_args() log(f"Starting summarization for session {args.session_id}") # Load transcript transcript = load_transcript(args.transcript) if not transcript: log("Empty or invalid transcript, skipping") return # Check threshold user_msg_count = count_user_messages(transcript) log(f"Found {user_msg_count} user messages") if user_msg_count < MIN_USER_MESSAGES: log(f"Session below threshold ({MIN_USER_MESSAGES}), marking as summarized without extraction") update_history_index(args.session_id, args.transcript, ["trivial"]) return # Extract conversation text conversation_text = extract_conversation_text(transcript) # Heuristic extraction (always run) log("Running heuristic extraction...") heuristic_results = heuristic_extraction(transcript) # LLM extraction (for substantive sessions) log("Running LLM extraction...") llm_results = llm_extraction(conversation_text, args.session_id) # Combine results all_results = { "decisions": llm_results.get("decisions", []), "preferences": llm_results.get("preferences", []), "projects": heuristic_results.get("projects", []), "facts": heuristic_results.get("facts", []), } # Save to memory files total_added = 0 topics = [] for category, items in all_results.items(): count = add_to_memory(category, items, args.session_id) total_added += count if count > 0: topics.append(category) log(f"Added {count} items to {category}") # Update history index update_history_index(args.session_id, args.transcript, topics) log(f"Summarization complete: {total_added} total items added") if __name__ == "__main__": main()