- /search command to search across memory, history, and configuration - history-browser.py for browsing and analyzing session history - install.sh for first-time setup with directory creation and validation - daily-maintenance.sh for scheduled backup, cleanup, and validation - systemd timer units for automated daily maintenance at 6 AM - Updated shell completions with 11 aliases - Test suite now covers 19 tests - Bump version to 1.1.0 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
287 lines
9.3 KiB
Python
Executable File
287 lines
9.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Search across PA memory, history, and configuration.
|
|
Usage: python3 search.py [--memory|--history|--config|--recent [days]] <query>
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
CLAUDE_DIR = Path.home() / ".claude"
|
|
STATE_DIR = CLAUDE_DIR / "state"
|
|
PA_DIR = STATE_DIR / "personal-assistant"
|
|
MEMORY_DIR = PA_DIR / "memory"
|
|
HISTORY_DIR = PA_DIR / "history"
|
|
|
|
|
|
def load_json(path: Path) -> Optional[Dict]:
|
|
"""Load JSON file safely."""
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
return None
|
|
|
|
|
|
def search_memory(query: str, case_insensitive: bool = True) -> List[Dict]:
|
|
"""Search through memory files."""
|
|
results = []
|
|
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
|
|
|
|
memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"]
|
|
|
|
for filename in memory_files:
|
|
data = load_json(MEMORY_DIR / filename)
|
|
if not data or "items" not in data:
|
|
continue
|
|
|
|
category = filename.replace(".json", "")
|
|
for item in data["items"]:
|
|
content = item.get("content", "")
|
|
context = item.get("context", "")
|
|
|
|
if pattern.search(content) or pattern.search(context):
|
|
results.append({
|
|
"source": f"memory/{category}",
|
|
"date": item.get("date", "unknown"),
|
|
"content": content,
|
|
"context": context,
|
|
"id": item.get("id", "")
|
|
})
|
|
|
|
# Also search general-instructions.json
|
|
gi_data = load_json(PA_DIR / "general-instructions.json")
|
|
if gi_data and "instructions" in gi_data:
|
|
for item in gi_data["instructions"]:
|
|
if item.get("status") != "active":
|
|
continue
|
|
content = item.get("instruction", "")
|
|
if pattern.search(content):
|
|
results.append({
|
|
"source": "memory/general-instructions",
|
|
"date": item.get("created", "unknown"),
|
|
"content": content,
|
|
"context": "",
|
|
"id": item.get("id", "")
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
def search_history(query: str, case_insensitive: bool = True) -> List[Dict]:
|
|
"""Search through session history."""
|
|
results = []
|
|
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
|
|
|
|
index_path = HISTORY_DIR / "index.json"
|
|
index_data = load_json(index_path)
|
|
|
|
if not index_data or "sessions" not in index_data:
|
|
return results
|
|
|
|
for session in index_data["sessions"]:
|
|
session_id = session.get("id", "")
|
|
summary = session.get("summary", "")
|
|
topics = session.get("topics", [])
|
|
date = session.get("date", "unknown")
|
|
|
|
# Search in summary and topics
|
|
topics_str = " ".join(topics) if topics else ""
|
|
if pattern.search(summary) or pattern.search(topics_str):
|
|
results.append({
|
|
"source": "history",
|
|
"date": date,
|
|
"session_id": session_id,
|
|
"content": summary,
|
|
"topics": topics
|
|
})
|
|
|
|
return results
|
|
|
|
|
|
def search_config(query: str, case_insensitive: bool = True) -> List[Dict]:
|
|
"""Search through configuration files."""
|
|
results = []
|
|
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
|
|
|
|
config_files = [
|
|
STATE_DIR / "component-registry.json",
|
|
STATE_DIR / "system-instructions.json",
|
|
STATE_DIR / "autonomy-levels.json",
|
|
STATE_DIR / "model-policy.json",
|
|
STATE_DIR / "kb.json",
|
|
PA_DIR / "kb.json",
|
|
]
|
|
|
|
for config_path in config_files:
|
|
if not config_path.exists():
|
|
continue
|
|
|
|
try:
|
|
content = config_path.read_text()
|
|
if pattern.search(content):
|
|
# Find matching lines
|
|
matches = []
|
|
for i, line in enumerate(content.split('\n'), 1):
|
|
if pattern.search(line):
|
|
matches.append(f"L{i}: {line.strip()[:100]}")
|
|
|
|
results.append({
|
|
"source": f"config/{config_path.name}",
|
|
"matches": matches[:5], # Limit to 5 matches per file
|
|
"total_matches": len(matches)
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
return results
|
|
|
|
|
|
def get_recent_items(days: int = 7) -> List[Dict]:
|
|
"""Get items from the last N days."""
|
|
results = []
|
|
cutoff = datetime.now() - timedelta(days=days)
|
|
|
|
# Check memory files
|
|
memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"]
|
|
|
|
for filename in memory_files:
|
|
data = load_json(MEMORY_DIR / filename)
|
|
if not data or "items" not in data:
|
|
continue
|
|
|
|
category = filename.replace(".json", "")
|
|
for item in data["items"]:
|
|
date_str = item.get("date", "")
|
|
try:
|
|
item_date = datetime.strptime(date_str, "%Y-%m-%d")
|
|
if item_date >= cutoff:
|
|
results.append({
|
|
"source": f"memory/{category}",
|
|
"date": date_str,
|
|
"content": item.get("content", ""),
|
|
"type": "memory"
|
|
})
|
|
except ValueError:
|
|
continue
|
|
|
|
# Check history
|
|
index_data = load_json(HISTORY_DIR / "index.json")
|
|
if index_data and "sessions" in index_data:
|
|
for session in index_data["sessions"]:
|
|
date_str = session.get("date", "")
|
|
try:
|
|
session_date = datetime.strptime(date_str[:10], "%Y-%m-%d")
|
|
if session_date >= cutoff:
|
|
results.append({
|
|
"source": "history",
|
|
"date": date_str,
|
|
"content": session.get("summary", "No summary"),
|
|
"type": "session",
|
|
"session_id": session.get("id", "")
|
|
})
|
|
except ValueError:
|
|
continue
|
|
|
|
# Sort by date, newest first
|
|
results.sort(key=lambda x: x.get("date", ""), reverse=True)
|
|
return results
|
|
|
|
|
|
def format_results(results: List[Dict], search_type: str) -> str:
|
|
"""Format search results for display."""
|
|
if not results:
|
|
return f"No results found in {search_type}.\n"
|
|
|
|
lines = [f"\n=== {search_type.title()} Results ({len(results)}) ===\n"]
|
|
|
|
for r in results:
|
|
source = r.get("source", "unknown")
|
|
date = r.get("date", "")
|
|
|
|
if "matches" in r:
|
|
# Config result
|
|
lines.append(f"📄 {source}")
|
|
lines.append(f" {r.get('total_matches', 0)} matches found:")
|
|
for match in r.get("matches", []):
|
|
lines.append(f" {match}")
|
|
elif "session_id" in r:
|
|
# History result
|
|
lines.append(f"📜 {date[:10]} - Session: {r.get('session_id', '')[:8]}...")
|
|
lines.append(f" {r.get('content', '')[:200]}")
|
|
if r.get("topics"):
|
|
lines.append(f" Topics: {', '.join(r['topics'][:5])}")
|
|
else:
|
|
# Memory result
|
|
lines.append(f"💾 [{source}] {date}")
|
|
lines.append(f" {r.get('content', '')[:200]}")
|
|
if r.get("context"):
|
|
lines.append(f" Context: {r.get('context', '')[:100]}")
|
|
|
|
lines.append("")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Search PA memory, history, and configuration"
|
|
)
|
|
parser.add_argument("query", nargs="*", help="Search query")
|
|
parser.add_argument("--memory", action="store_true", help="Search only memory")
|
|
parser.add_argument("--history", action="store_true", help="Search only history")
|
|
parser.add_argument("--config", action="store_true", help="Search only config")
|
|
parser.add_argument("--recent", type=int, nargs="?", const=7,
|
|
help="Show recent items (default: 7 days)")
|
|
parser.add_argument("-i", "--case-insensitive", action="store_true", default=True,
|
|
help="Case insensitive search (default)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Handle --recent
|
|
if args.recent is not None:
|
|
results = get_recent_items(args.recent)
|
|
print(f"\n=== Items from last {args.recent} days ({len(results)}) ===\n")
|
|
for r in results:
|
|
print(f"[{r['date'][:10]}] {r['source']}: {r['content'][:100]}...")
|
|
return 0
|
|
|
|
# Need query for search
|
|
if not args.query:
|
|
parser.print_help()
|
|
return 1
|
|
|
|
query = " ".join(args.query)
|
|
|
|
# Determine what to search
|
|
search_all = not (args.memory or args.history or args.config)
|
|
|
|
output = []
|
|
|
|
if search_all or args.memory:
|
|
results = search_memory(query)
|
|
output.append(format_results(results, "memory"))
|
|
|
|
if search_all or args.history:
|
|
results = search_history(query)
|
|
output.append(format_results(results, "history"))
|
|
|
|
if search_all or args.config:
|
|
results = search_config(query)
|
|
output.append(format_results(results, "config"))
|
|
|
|
print(f"\n🔍 Search: '{query}'")
|
|
print("".join(output))
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|