Add SessionEnd hook for automatic session summarization

Implements automatic memory extraction when Claude Code sessions end:
- Add SessionEnd hook to hooks.json with 120s timeout
- Create session-end.sh wrapper that parses hook input and runs summarizer
- Create summarize-transcript.py that:
  - Loads transcript from Claude's storage
  - Skips trivial sessions (<3 user messages)
  - Extracts paths/facts via heuristics
  - Uses Claude CLI (subscription auth) for decisions/preferences
  - Saves to memory files with deduplication
  - Updates history index with summarized flag

Uses `claude -p --model haiku` for LLM extraction, leveraging
existing subscription credentials instead of requiring API key.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OpenCode Test
2026-01-03 14:13:43 -08:00
parent 91fa0608d0
commit f07022ca60
3 changed files with 448 additions and 0 deletions

View File

@@ -19,6 +19,17 @@
}
]
}
],
"SessionEnd": [
{
"hooks": [
{
"type": "command",
"command": "~/.claude/hooks/scripts/session-end.sh",
"timeout": 120
}
]
}
]
}
}

54
hooks/scripts/session-end.sh Executable file
View File

@@ -0,0 +1,54 @@
#!/bin/bash
# Session end hook - triggers summarization of the conversation
# Receives JSON via stdin with session_id, transcript_path, reason
#
# Uses Claude CLI with subscription credentials for LLM extraction.
# Heuristic extraction (paths, facts) always runs.
# LLM extraction (decisions, preferences) runs if claude CLI is available.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LOG_FILE="${HOME}/.claude/logs/session-end.log"
# Ensure log directory exists
mkdir -p "$(dirname "$LOG_FILE")"
log() {
echo "[$(date -Iseconds)] $*" >> "$LOG_FILE"
}
# Read JSON input from stdin
INPUT=$(cat)
# Parse JSON fields
SESSION_ID=$(echo "$INPUT" | python3 -c "import sys, json; print(json.load(sys.stdin).get('session_id', ''))" 2>/dev/null || echo "")
TRANSCRIPT_PATH=$(echo "$INPUT" | python3 -c "import sys, json; print(json.load(sys.stdin).get('transcript_path', ''))" 2>/dev/null || echo "")
REASON=$(echo "$INPUT" | python3 -c "import sys, json; print(json.load(sys.stdin).get('reason', ''))" 2>/dev/null || echo "")
log "SessionEnd triggered: session=$SESSION_ID reason=$REASON"
# Validate required fields
if [[ -z "$SESSION_ID" || -z "$TRANSCRIPT_PATH" ]]; then
log "ERROR: Missing session_id or transcript_path"
exit 0 # Exit cleanly - don't break session exit
fi
# Check if transcript exists
if [[ ! -f "$TRANSCRIPT_PATH" ]]; then
log "ERROR: Transcript not found at $TRANSCRIPT_PATH"
exit 0
fi
# Run summarization script in background to not block session exit
# The script will handle its own error logging
nohup python3 "${SCRIPT_DIR}/summarize-transcript.py" \
--session-id "$SESSION_ID" \
--transcript "$TRANSCRIPT_PATH" \
--reason "$REASON" \
>> "$LOG_FILE" 2>&1 &
log "Summarization started in background (PID: $!)"
# Return success - don't block session exit
exit 0

View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
Summarize a Claude Code session transcript and extract key information to memory.
This script:
1. Loads the transcript from Claude's storage
2. Checks if session is substantive (>= 3 user messages)
3. Extracts facts/paths via heuristics
4. Uses Claude CLI (with subscription auth) for decisions/preferences if substantive
5. Updates memory files and marks session as summarized
"""
import argparse
import json
import os
import re
import subprocess
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
# Paths
STATE_DIR = Path.home() / ".claude/state/personal-assistant"
MEMORY_DIR = STATE_DIR / "memory"
HISTORY_INDEX = STATE_DIR / "history/index.json"
# Memory files
MEMORY_FILES = {
"decisions": MEMORY_DIR / "decisions.json",
"preferences": MEMORY_DIR / "preferences.json",
"projects": MEMORY_DIR / "projects.json",
"facts": MEMORY_DIR / "facts.json",
}
# Minimum threshold for substantive sessions
MIN_USER_MESSAGES = 3
def log(msg: str) -> None:
"""Log with timestamp."""
print(f"[{datetime.now().isoformat()}] {msg}", file=sys.stderr)
def load_transcript(path: str) -> list[dict]:
"""Load transcript from jsonl file."""
messages = []
try:
with open(path, "r") as f:
for line in f:
line = line.strip()
if line:
messages.append(json.loads(line))
except Exception as e:
log(f"Error loading transcript: {e}")
return messages
def count_user_messages(transcript: list[dict]) -> int:
"""Count the number of user messages in transcript."""
count = 0
for entry in transcript:
# Claude Code format: type is "user" or "assistant" at top level
if entry.get("type") == "user":
count += 1
return count
def extract_conversation_text(transcript: list[dict]) -> str:
"""Extract readable conversation text from transcript."""
parts = []
for entry in transcript:
entry_type = entry.get("type", "")
# Skip non-message entries (like queue-operation)
if entry_type not in ("user", "assistant"):
continue
message = entry.get("message", {})
if not isinstance(message, dict):
continue
role = message.get("role", entry_type)
content = message.get("content", "")
# Handle different content formats
if isinstance(content, list):
# Assistant messages have content as array of blocks
text_parts = []
for block in content:
if isinstance(block, dict):
if block.get("type") == "text":
text_parts.append(block.get("text", ""))
elif block.get("type") == "tool_use":
text_parts.append(f"[Tool: {block.get('name', 'unknown')}]")
elif isinstance(block, str):
text_parts.append(block)
content = "\n".join(text_parts)
elif isinstance(content, str):
# User messages have content as string
pass
else:
continue
if content:
parts.append(f"[{role}]: {content[:2000]}") # Truncate long messages
return "\n\n".join(parts)
def heuristic_extraction(transcript: list[dict]) -> dict[str, list[dict]]:
"""Extract simple facts and paths using heuristics."""
results = {"projects": [], "facts": []}
conversation = extract_conversation_text(transcript)
# Extract file paths mentioned
path_pattern = r'(?:/[\w.-]+)+(?:/[\w.-]*)?'
paths = set(re.findall(path_pattern, conversation))
# Filter to likely project paths
project_paths = set()
for p in paths:
if any(x in p for x in ['/home/', '/Users/', '/.claude/', '/projects/']):
if not any(x in p for x in ['/proc/', '/sys/', '/dev/', '/tmp/']):
project_paths.add(p)
# Add unique project paths as context
for path in list(project_paths)[:5]: # Limit to 5 paths
results["projects"].append({
"content": f"Worked with path: {path}",
"context": "File path referenced in session"
})
# Extract tool/environment facts
tool_patterns = [
(r'using\s+([\w-]+)\s+version\s+([\d.]+)', "Tool version: {0} {1}"),
(r'(python|node|npm|pip)\s+.*?([\d.]+)', "Runtime: {0} {1}"),
]
for pattern, template in tool_patterns:
matches = re.findall(pattern, conversation, re.IGNORECASE)
for match in matches[:2]: # Limit matches
results["facts"].append({
"content": template.format(*match),
"context": "Environment fact from session"
})
return results
def llm_extraction(conversation_text: str, session_id: str) -> dict[str, list[dict]]:
"""Use Claude CLI to extract decisions and preferences."""
results = {"decisions": [], "preferences": []}
# Check if claude CLI is available
claude_path = subprocess.run(
["which", "claude"], capture_output=True, text=True
).stdout.strip()
if not claude_path:
log("Claude CLI not found, skipping LLM extraction")
return results
prompt = f"""Analyze this conversation excerpt and extract key information.
CONVERSATION:
{conversation_text[:15000]}
Extract and return a JSON object with:
1. "decisions": List of decisions made (choices, directions taken, approaches selected)
2. "preferences": List of user preferences learned (likes, dislikes, preferred approaches)
For each item include:
- "content": Brief description (1 sentence)
- "context": Why this matters or additional context
Only include genuinely significant items. Skip trivial or obvious things.
Return valid JSON only, no markdown formatting.
Example format:
{{"decisions": [{{"content": "Use PostgreSQL for the database", "context": "Chosen for JSONB support"}}], "preferences": [{{"content": "Prefers explicit type annotations", "context": "For code clarity"}}]}}"""
try:
# Use claude CLI in print mode with haiku for cost efficiency
result = subprocess.run(
[
claude_path, "-p",
"--model", "haiku",
"--no-session-persistence",
prompt
],
capture_output=True,
text=True,
timeout=60,
cwd=str(Path.home()) # Run from home to avoid project context
)
if result.returncode != 0:
log(f"Claude CLI failed: {result.stderr[:500]}")
return results
response_text = result.stdout.strip()
# Try to extract JSON from response
try:
# Handle potential markdown code blocks
if "```" in response_text:
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', response_text, re.DOTALL)
if json_match:
response_text = json_match.group(1)
data = json.loads(response_text)
for key in ["decisions", "preferences"]:
if key in data and isinstance(data[key], list):
for item in data[key][:5]: # Limit to 5 per category
if isinstance(item, dict) and "content" in item:
results[key].append({
"content": item["content"],
"context": item.get("context", "")
})
except json.JSONDecodeError as e:
log(f"Failed to parse LLM response as JSON: {e}")
log(f"Response was: {response_text[:500]}")
except subprocess.TimeoutExpired:
log("Claude CLI timed out")
except Exception as e:
log(f"LLM extraction error: {e}")
return results
def load_memory_file(path: Path) -> dict:
"""Load a memory file, creating default structure if needed."""
if path.exists():
try:
with open(path) as f:
return json.load(f)
except json.JSONDecodeError:
pass
# Default structure
return {
"version": "1.0",
"description": f"{path.stem.title()} extracted from sessions",
"items": []
}
def save_memory_file(path: Path, data: dict) -> None:
"""Save a memory file."""
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(data, f, indent=2)
def add_to_memory(category: str, items: list[dict], session_id: str) -> int:
"""Add items to a memory category. Returns count of items added."""
if not items:
return 0
path = MEMORY_FILES.get(category)
if not path:
return 0
data = load_memory_file(path)
today = datetime.now().strftime("%Y-%m-%d")
# Check for duplicates based on content
existing_content = {item.get("content", "").lower() for item in data.get("items", [])}
added = 0
for item in items:
content = item.get("content", "")
if content.lower() not in existing_content:
data["items"].append({
"id": str(uuid.uuid4()),
"date": today,
"content": content,
"context": item.get("context", ""),
"session": session_id
})
existing_content.add(content.lower())
added += 1
if added > 0:
save_memory_file(path, data)
return added
def update_history_index(session_id: str, transcript_path: str, topics: list[str]) -> None:
"""Mark session as summarized in history index."""
if not HISTORY_INDEX.exists():
log(f"History index not found: {HISTORY_INDEX}")
return
try:
with open(HISTORY_INDEX) as f:
data = json.load(f)
# Find and update the session
for session in data.get("sessions", []):
if session.get("id") == session_id:
session["summarized"] = True
session["transcript_path"] = transcript_path
session["topics"] = topics[:5] # Limit topics
session["summarized_at"] = datetime.now().isoformat()
break
with open(HISTORY_INDEX, "w") as f:
json.dump(data, f, indent=2)
log(f"Updated history index for session {session_id}")
except Exception as e:
log(f"Error updating history index: {e}")
def main():
parser = argparse.ArgumentParser(description="Summarize a Claude Code session")
parser.add_argument("--session-id", required=True, help="Session ID")
parser.add_argument("--transcript", required=True, help="Path to transcript file")
parser.add_argument("--reason", default="", help="Session end reason")
args = parser.parse_args()
log(f"Starting summarization for session {args.session_id}")
# Load transcript
transcript = load_transcript(args.transcript)
if not transcript:
log("Empty or invalid transcript, skipping")
return
# Check threshold
user_msg_count = count_user_messages(transcript)
log(f"Found {user_msg_count} user messages")
if user_msg_count < MIN_USER_MESSAGES:
log(f"Session below threshold ({MIN_USER_MESSAGES}), marking as summarized without extraction")
update_history_index(args.session_id, args.transcript, ["trivial"])
return
# Extract conversation text
conversation_text = extract_conversation_text(transcript)
# Heuristic extraction (always run)
log("Running heuristic extraction...")
heuristic_results = heuristic_extraction(transcript)
# LLM extraction (for substantive sessions)
log("Running LLM extraction...")
llm_results = llm_extraction(conversation_text, args.session_id)
# Combine results
all_results = {
"decisions": llm_results.get("decisions", []),
"preferences": llm_results.get("preferences", []),
"projects": heuristic_results.get("projects", []),
"facts": heuristic_results.get("facts", []),
}
# Save to memory files
total_added = 0
topics = []
for category, items in all_results.items():
count = add_to_memory(category, items, args.session_id)
total_added += count
if count > 0:
topics.append(category)
log(f"Added {count} items to {category}")
# Update history index
update_history_index(args.session_id, args.transcript, topics)
log(f"Summarization complete: {total_added} total items added")
if __name__ == "__main__":
main()