Files
claude-code/automation/search.py
OpenCode Test 125bb4904b Add search command, history browser, install script, and systemd timers
- /search command to search across memory, history, and configuration
- history-browser.py for browsing and analyzing session history
- install.sh for first-time setup with directory creation and validation
- daily-maintenance.sh for scheduled backup, cleanup, and validation
- systemd timer units for automated daily maintenance at 6 AM
- Updated shell completions with 11 aliases
- Test suite now covers 19 tests
- Bump version to 1.1.0

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-01 18:41:07 -08:00

287 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Search across PA memory, history, and configuration.
Usage: python3 search.py [--memory|--history|--config|--recent [days]] <query>
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional
CLAUDE_DIR = Path.home() / ".claude"
STATE_DIR = CLAUDE_DIR / "state"
PA_DIR = STATE_DIR / "personal-assistant"
MEMORY_DIR = PA_DIR / "memory"
HISTORY_DIR = PA_DIR / "history"
def load_json(path: Path) -> Optional[Dict]:
"""Load JSON file safely."""
try:
with open(path) as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def search_memory(query: str, case_insensitive: bool = True) -> List[Dict]:
"""Search through memory files."""
results = []
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"]
for filename in memory_files:
data = load_json(MEMORY_DIR / filename)
if not data or "items" not in data:
continue
category = filename.replace(".json", "")
for item in data["items"]:
content = item.get("content", "")
context = item.get("context", "")
if pattern.search(content) or pattern.search(context):
results.append({
"source": f"memory/{category}",
"date": item.get("date", "unknown"),
"content": content,
"context": context,
"id": item.get("id", "")
})
# Also search general-instructions.json
gi_data = load_json(PA_DIR / "general-instructions.json")
if gi_data and "instructions" in gi_data:
for item in gi_data["instructions"]:
if item.get("status") != "active":
continue
content = item.get("instruction", "")
if pattern.search(content):
results.append({
"source": "memory/general-instructions",
"date": item.get("created", "unknown"),
"content": content,
"context": "",
"id": item.get("id", "")
})
return results
def search_history(query: str, case_insensitive: bool = True) -> List[Dict]:
"""Search through session history."""
results = []
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
index_path = HISTORY_DIR / "index.json"
index_data = load_json(index_path)
if not index_data or "sessions" not in index_data:
return results
for session in index_data["sessions"]:
session_id = session.get("id", "")
summary = session.get("summary", "")
topics = session.get("topics", [])
date = session.get("date", "unknown")
# Search in summary and topics
topics_str = " ".join(topics) if topics else ""
if pattern.search(summary) or pattern.search(topics_str):
results.append({
"source": "history",
"date": date,
"session_id": session_id,
"content": summary,
"topics": topics
})
return results
def search_config(query: str, case_insensitive: bool = True) -> List[Dict]:
"""Search through configuration files."""
results = []
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
config_files = [
STATE_DIR / "component-registry.json",
STATE_DIR / "system-instructions.json",
STATE_DIR / "autonomy-levels.json",
STATE_DIR / "model-policy.json",
STATE_DIR / "kb.json",
PA_DIR / "kb.json",
]
for config_path in config_files:
if not config_path.exists():
continue
try:
content = config_path.read_text()
if pattern.search(content):
# Find matching lines
matches = []
for i, line in enumerate(content.split('\n'), 1):
if pattern.search(line):
matches.append(f"L{i}: {line.strip()[:100]}")
results.append({
"source": f"config/{config_path.name}",
"matches": matches[:5], # Limit to 5 matches per file
"total_matches": len(matches)
})
except Exception:
continue
return results
def get_recent_items(days: int = 7) -> List[Dict]:
"""Get items from the last N days."""
results = []
cutoff = datetime.now() - timedelta(days=days)
# Check memory files
memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"]
for filename in memory_files:
data = load_json(MEMORY_DIR / filename)
if not data or "items" not in data:
continue
category = filename.replace(".json", "")
for item in data["items"]:
date_str = item.get("date", "")
try:
item_date = datetime.strptime(date_str, "%Y-%m-%d")
if item_date >= cutoff:
results.append({
"source": f"memory/{category}",
"date": date_str,
"content": item.get("content", ""),
"type": "memory"
})
except ValueError:
continue
# Check history
index_data = load_json(HISTORY_DIR / "index.json")
if index_data and "sessions" in index_data:
for session in index_data["sessions"]:
date_str = session.get("date", "")
try:
session_date = datetime.strptime(date_str[:10], "%Y-%m-%d")
if session_date >= cutoff:
results.append({
"source": "history",
"date": date_str,
"content": session.get("summary", "No summary"),
"type": "session",
"session_id": session.get("id", "")
})
except ValueError:
continue
# Sort by date, newest first
results.sort(key=lambda x: x.get("date", ""), reverse=True)
return results
def format_results(results: List[Dict], search_type: str) -> str:
"""Format search results for display."""
if not results:
return f"No results found in {search_type}.\n"
lines = [f"\n=== {search_type.title()} Results ({len(results)}) ===\n"]
for r in results:
source = r.get("source", "unknown")
date = r.get("date", "")
if "matches" in r:
# Config result
lines.append(f"📄 {source}")
lines.append(f" {r.get('total_matches', 0)} matches found:")
for match in r.get("matches", []):
lines.append(f" {match}")
elif "session_id" in r:
# History result
lines.append(f"📜 {date[:10]} - Session: {r.get('session_id', '')[:8]}...")
lines.append(f" {r.get('content', '')[:200]}")
if r.get("topics"):
lines.append(f" Topics: {', '.join(r['topics'][:5])}")
else:
# Memory result
lines.append(f"💾 [{source}] {date}")
lines.append(f" {r.get('content', '')[:200]}")
if r.get("context"):
lines.append(f" Context: {r.get('context', '')[:100]}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(
description="Search PA memory, history, and configuration"
)
parser.add_argument("query", nargs="*", help="Search query")
parser.add_argument("--memory", action="store_true", help="Search only memory")
parser.add_argument("--history", action="store_true", help="Search only history")
parser.add_argument("--config", action="store_true", help="Search only config")
parser.add_argument("--recent", type=int, nargs="?", const=7,
help="Show recent items (default: 7 days)")
parser.add_argument("-i", "--case-insensitive", action="store_true", default=True,
help="Case insensitive search (default)")
args = parser.parse_args()
# Handle --recent
if args.recent is not None:
results = get_recent_items(args.recent)
print(f"\n=== Items from last {args.recent} days ({len(results)}) ===\n")
for r in results:
print(f"[{r['date'][:10]}] {r['source']}: {r['content'][:100]}...")
return 0
# Need query for search
if not args.query:
parser.print_help()
return 1
query = " ".join(args.query)
# Determine what to search
search_all = not (args.memory or args.history or args.config)
output = []
if search_all or args.memory:
results = search_memory(query)
output.append(format_results(results, "memory"))
if search_all or args.history:
results = search_history(query)
output.append(format_results(results, "history"))
if search_all or args.config:
results = search_config(query)
output.append(format_results(results, "config"))
print(f"\n🔍 Search: '{query}'")
print("".join(output))
return 0
if __name__ == "__main__":
sys.exit(main())