From f07022ca60d116c6bfd6c33c12d4cc01cbb380f0 Mon Sep 17 00:00:00 2001 From: OpenCode Test Date: Sat, 3 Jan 2026 14:13:43 -0800 Subject: [PATCH] Add SessionEnd hook for automatic session summarization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements automatic memory extraction when Claude Code sessions end: - Add SessionEnd hook to hooks.json with 120s timeout - Create session-end.sh wrapper that parses hook input and runs summarizer - Create summarize-transcript.py that: - Loads transcript from Claude's storage - Skips trivial sessions (<3 user messages) - Extracts paths/facts via heuristics - Uses Claude CLI (subscription auth) for decisions/preferences - Saves to memory files with deduplication - Updates history index with summarized flag Uses `claude -p --model haiku` for LLM extraction, leveraging existing subscription credentials instead of requiring API key. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- hooks/hooks.json | 11 + hooks/scripts/session-end.sh | 54 ++++ hooks/scripts/summarize-transcript.py | 383 ++++++++++++++++++++++++++ 3 files changed, 448 insertions(+) create mode 100755 hooks/scripts/session-end.sh create mode 100755 hooks/scripts/summarize-transcript.py diff --git a/hooks/hooks.json b/hooks/hooks.json index e649a85..428cbd3 100644 --- a/hooks/hooks.json +++ b/hooks/hooks.json @@ -19,6 +19,17 @@ } ] } + ], + "SessionEnd": [ + { + "hooks": [ + { + "type": "command", + "command": "~/.claude/hooks/scripts/session-end.sh", + "timeout": 120 + } + ] + } ] } } diff --git a/hooks/scripts/session-end.sh b/hooks/scripts/session-end.sh new file mode 100755 index 0000000..3278c32 --- /dev/null +++ b/hooks/scripts/session-end.sh @@ -0,0 +1,54 @@ +#!/bin/bash +# Session end hook - triggers summarization of the conversation +# Receives JSON via stdin with session_id, transcript_path, reason +# +# Uses Claude CLI with subscription credentials for LLM extraction. +# Heuristic extraction (paths, facts) always runs. +# LLM extraction (decisions, preferences) runs if claude CLI is available. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +LOG_FILE="${HOME}/.claude/logs/session-end.log" + +# Ensure log directory exists +mkdir -p "$(dirname "$LOG_FILE")" + +log() { + echo "[$(date -Iseconds)] $*" >> "$LOG_FILE" +} + +# Read JSON input from stdin +INPUT=$(cat) + +# Parse JSON fields +SESSION_ID=$(echo "$INPUT" | python3 -c "import sys, json; print(json.load(sys.stdin).get('session_id', ''))" 2>/dev/null || echo "") +TRANSCRIPT_PATH=$(echo "$INPUT" | python3 -c "import sys, json; print(json.load(sys.stdin).get('transcript_path', ''))" 2>/dev/null || echo "") +REASON=$(echo "$INPUT" | python3 -c "import sys, json; print(json.load(sys.stdin).get('reason', ''))" 2>/dev/null || echo "") + +log "SessionEnd triggered: session=$SESSION_ID reason=$REASON" + +# Validate required fields +if [[ -z "$SESSION_ID" || -z "$TRANSCRIPT_PATH" ]]; then + log "ERROR: Missing session_id or transcript_path" + exit 0 # Exit cleanly - don't break session exit +fi + +# Check if transcript exists +if [[ ! -f "$TRANSCRIPT_PATH" ]]; then + log "ERROR: Transcript not found at $TRANSCRIPT_PATH" + exit 0 +fi + +# Run summarization script in background to not block session exit +# The script will handle its own error logging +nohup python3 "${SCRIPT_DIR}/summarize-transcript.py" \ + --session-id "$SESSION_ID" \ + --transcript "$TRANSCRIPT_PATH" \ + --reason "$REASON" \ + >> "$LOG_FILE" 2>&1 & + +log "Summarization started in background (PID: $!)" + +# Return success - don't block session exit +exit 0 diff --git a/hooks/scripts/summarize-transcript.py b/hooks/scripts/summarize-transcript.py new file mode 100755 index 0000000..29e75ce --- /dev/null +++ b/hooks/scripts/summarize-transcript.py @@ -0,0 +1,383 @@ +#!/usr/bin/env python3 +""" +Summarize a Claude Code session transcript and extract key information to memory. + +This script: +1. Loads the transcript from Claude's storage +2. Checks if session is substantive (>= 3 user messages) +3. Extracts facts/paths via heuristics +4. Uses Claude CLI (with subscription auth) for decisions/preferences if substantive +5. Updates memory files and marks session as summarized +""" + +import argparse +import json +import os +import re +import subprocess +import sys +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any + +# Paths +STATE_DIR = Path.home() / ".claude/state/personal-assistant" +MEMORY_DIR = STATE_DIR / "memory" +HISTORY_INDEX = STATE_DIR / "history/index.json" + +# Memory files +MEMORY_FILES = { + "decisions": MEMORY_DIR / "decisions.json", + "preferences": MEMORY_DIR / "preferences.json", + "projects": MEMORY_DIR / "projects.json", + "facts": MEMORY_DIR / "facts.json", +} + +# Minimum threshold for substantive sessions +MIN_USER_MESSAGES = 3 + + +def log(msg: str) -> None: + """Log with timestamp.""" + print(f"[{datetime.now().isoformat()}] {msg}", file=sys.stderr) + + +def load_transcript(path: str) -> list[dict]: + """Load transcript from jsonl file.""" + messages = [] + try: + with open(path, "r") as f: + for line in f: + line = line.strip() + if line: + messages.append(json.loads(line)) + except Exception as e: + log(f"Error loading transcript: {e}") + return messages + + +def count_user_messages(transcript: list[dict]) -> int: + """Count the number of user messages in transcript.""" + count = 0 + for entry in transcript: + # Claude Code format: type is "user" or "assistant" at top level + if entry.get("type") == "user": + count += 1 + return count + + +def extract_conversation_text(transcript: list[dict]) -> str: + """Extract readable conversation text from transcript.""" + parts = [] + for entry in transcript: + entry_type = entry.get("type", "") + + # Skip non-message entries (like queue-operation) + if entry_type not in ("user", "assistant"): + continue + + message = entry.get("message", {}) + if not isinstance(message, dict): + continue + + role = message.get("role", entry_type) + content = message.get("content", "") + + # Handle different content formats + if isinstance(content, list): + # Assistant messages have content as array of blocks + text_parts = [] + for block in content: + if isinstance(block, dict): + if block.get("type") == "text": + text_parts.append(block.get("text", "")) + elif block.get("type") == "tool_use": + text_parts.append(f"[Tool: {block.get('name', 'unknown')}]") + elif isinstance(block, str): + text_parts.append(block) + content = "\n".join(text_parts) + elif isinstance(content, str): + # User messages have content as string + pass + else: + continue + + if content: + parts.append(f"[{role}]: {content[:2000]}") # Truncate long messages + + return "\n\n".join(parts) + + +def heuristic_extraction(transcript: list[dict]) -> dict[str, list[dict]]: + """Extract simple facts and paths using heuristics.""" + results = {"projects": [], "facts": []} + + conversation = extract_conversation_text(transcript) + + # Extract file paths mentioned + path_pattern = r'(?:/[\w.-]+)+(?:/[\w.-]*)?' + paths = set(re.findall(path_pattern, conversation)) + + # Filter to likely project paths + project_paths = set() + for p in paths: + if any(x in p for x in ['/home/', '/Users/', '/.claude/', '/projects/']): + if not any(x in p for x in ['/proc/', '/sys/', '/dev/', '/tmp/']): + project_paths.add(p) + + # Add unique project paths as context + for path in list(project_paths)[:5]: # Limit to 5 paths + results["projects"].append({ + "content": f"Worked with path: {path}", + "context": "File path referenced in session" + }) + + # Extract tool/environment facts + tool_patterns = [ + (r'using\s+([\w-]+)\s+version\s+([\d.]+)', "Tool version: {0} {1}"), + (r'(python|node|npm|pip)\s+.*?([\d.]+)', "Runtime: {0} {1}"), + ] + + for pattern, template in tool_patterns: + matches = re.findall(pattern, conversation, re.IGNORECASE) + for match in matches[:2]: # Limit matches + results["facts"].append({ + "content": template.format(*match), + "context": "Environment fact from session" + }) + + return results + + +def llm_extraction(conversation_text: str, session_id: str) -> dict[str, list[dict]]: + """Use Claude CLI to extract decisions and preferences.""" + results = {"decisions": [], "preferences": []} + + # Check if claude CLI is available + claude_path = subprocess.run( + ["which", "claude"], capture_output=True, text=True + ).stdout.strip() + + if not claude_path: + log("Claude CLI not found, skipping LLM extraction") + return results + + prompt = f"""Analyze this conversation excerpt and extract key information. + +CONVERSATION: +{conversation_text[:15000]} + +Extract and return a JSON object with: +1. "decisions": List of decisions made (choices, directions taken, approaches selected) +2. "preferences": List of user preferences learned (likes, dislikes, preferred approaches) + +For each item include: +- "content": Brief description (1 sentence) +- "context": Why this matters or additional context + +Only include genuinely significant items. Skip trivial or obvious things. +Return valid JSON only, no markdown formatting. + +Example format: +{{"decisions": [{{"content": "Use PostgreSQL for the database", "context": "Chosen for JSONB support"}}], "preferences": [{{"content": "Prefers explicit type annotations", "context": "For code clarity"}}]}}""" + + try: + # Use claude CLI in print mode with haiku for cost efficiency + result = subprocess.run( + [ + claude_path, "-p", + "--model", "haiku", + "--no-session-persistence", + prompt + ], + capture_output=True, + text=True, + timeout=60, + cwd=str(Path.home()) # Run from home to avoid project context + ) + + if result.returncode != 0: + log(f"Claude CLI failed: {result.stderr[:500]}") + return results + + response_text = result.stdout.strip() + + # Try to extract JSON from response + try: + # Handle potential markdown code blocks + if "```" in response_text: + json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL) + if json_match: + response_text = json_match.group(1) + + data = json.loads(response_text) + + for key in ["decisions", "preferences"]: + if key in data and isinstance(data[key], list): + for item in data[key][:5]: # Limit to 5 per category + if isinstance(item, dict) and "content" in item: + results[key].append({ + "content": item["content"], + "context": item.get("context", "") + }) + except json.JSONDecodeError as e: + log(f"Failed to parse LLM response as JSON: {e}") + log(f"Response was: {response_text[:500]}") + + except subprocess.TimeoutExpired: + log("Claude CLI timed out") + except Exception as e: + log(f"LLM extraction error: {e}") + + return results + + +def load_memory_file(path: Path) -> dict: + """Load a memory file, creating default structure if needed.""" + if path.exists(): + try: + with open(path) as f: + return json.load(f) + except json.JSONDecodeError: + pass + + # Default structure + return { + "version": "1.0", + "description": f"{path.stem.title()} extracted from sessions", + "items": [] + } + + +def save_memory_file(path: Path, data: dict) -> None: + """Save a memory file.""" + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w") as f: + json.dump(data, f, indent=2) + + +def add_to_memory(category: str, items: list[dict], session_id: str) -> int: + """Add items to a memory category. Returns count of items added.""" + if not items: + return 0 + + path = MEMORY_FILES.get(category) + if not path: + return 0 + + data = load_memory_file(path) + today = datetime.now().strftime("%Y-%m-%d") + + # Check for duplicates based on content + existing_content = {item.get("content", "").lower() for item in data.get("items", [])} + + added = 0 + for item in items: + content = item.get("content", "") + if content.lower() not in existing_content: + data["items"].append({ + "id": str(uuid.uuid4()), + "date": today, + "content": content, + "context": item.get("context", ""), + "session": session_id + }) + existing_content.add(content.lower()) + added += 1 + + if added > 0: + save_memory_file(path, data) + + return added + + +def update_history_index(session_id: str, transcript_path: str, topics: list[str]) -> None: + """Mark session as summarized in history index.""" + if not HISTORY_INDEX.exists(): + log(f"History index not found: {HISTORY_INDEX}") + return + + try: + with open(HISTORY_INDEX) as f: + data = json.load(f) + + # Find and update the session + for session in data.get("sessions", []): + if session.get("id") == session_id: + session["summarized"] = True + session["transcript_path"] = transcript_path + session["topics"] = topics[:5] # Limit topics + session["summarized_at"] = datetime.now().isoformat() + break + + with open(HISTORY_INDEX, "w") as f: + json.dump(data, f, indent=2) + + log(f"Updated history index for session {session_id}") + + except Exception as e: + log(f"Error updating history index: {e}") + + +def main(): + parser = argparse.ArgumentParser(description="Summarize a Claude Code session") + parser.add_argument("--session-id", required=True, help="Session ID") + parser.add_argument("--transcript", required=True, help="Path to transcript file") + parser.add_argument("--reason", default="", help="Session end reason") + args = parser.parse_args() + + log(f"Starting summarization for session {args.session_id}") + + # Load transcript + transcript = load_transcript(args.transcript) + if not transcript: + log("Empty or invalid transcript, skipping") + return + + # Check threshold + user_msg_count = count_user_messages(transcript) + log(f"Found {user_msg_count} user messages") + + if user_msg_count < MIN_USER_MESSAGES: + log(f"Session below threshold ({MIN_USER_MESSAGES}), marking as summarized without extraction") + update_history_index(args.session_id, args.transcript, ["trivial"]) + return + + # Extract conversation text + conversation_text = extract_conversation_text(transcript) + + # Heuristic extraction (always run) + log("Running heuristic extraction...") + heuristic_results = heuristic_extraction(transcript) + + # LLM extraction (for substantive sessions) + log("Running LLM extraction...") + llm_results = llm_extraction(conversation_text, args.session_id) + + # Combine results + all_results = { + "decisions": llm_results.get("decisions", []), + "preferences": llm_results.get("preferences", []), + "projects": heuristic_results.get("projects", []), + "facts": heuristic_results.get("facts", []), + } + + # Save to memory files + total_added = 0 + topics = [] + for category, items in all_results.items(): + count = add_to_memory(category, items, args.session_id) + total_added += count + if count > 0: + topics.append(category) + log(f"Added {count} items to {category}") + + # Update history index + update_history_index(args.session_id, args.transcript, topics) + + log(f"Summarization complete: {total_added} total items added") + + +if __name__ == "__main__": + main()