Add search command, history browser, install script, and systemd timers
- /search command to search across memory, history, and configuration - history-browser.py for browsing and analyzing session history - install.sh for first-time setup with directory creation and validation - daily-maintenance.sh for scheduled backup, cleanup, and validation - systemd timer units for automated daily maintenance at 6 AM - Updated shell completions with 11 aliases - Test suite now covers 19 tests - Bump version to 1.1.0 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
286
automation/search.py
Executable file
286
automation/search.py
Executable file
@@ -0,0 +1,286 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Search across PA memory, history, and configuration.
|
||||
Usage: python3 search.py [--memory|--history|--config|--recent [days]] <query>
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
CLAUDE_DIR = Path.home() / ".claude"
|
||||
STATE_DIR = CLAUDE_DIR / "state"
|
||||
PA_DIR = STATE_DIR / "personal-assistant"
|
||||
MEMORY_DIR = PA_DIR / "memory"
|
||||
HISTORY_DIR = PA_DIR / "history"
|
||||
|
||||
|
||||
def load_json(path: Path) -> Optional[Dict]:
|
||||
"""Load JSON file safely."""
|
||||
try:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
except (FileNotFoundError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
|
||||
def search_memory(query: str, case_insensitive: bool = True) -> List[Dict]:
|
||||
"""Search through memory files."""
|
||||
results = []
|
||||
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
|
||||
|
||||
memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"]
|
||||
|
||||
for filename in memory_files:
|
||||
data = load_json(MEMORY_DIR / filename)
|
||||
if not data or "items" not in data:
|
||||
continue
|
||||
|
||||
category = filename.replace(".json", "")
|
||||
for item in data["items"]:
|
||||
content = item.get("content", "")
|
||||
context = item.get("context", "")
|
||||
|
||||
if pattern.search(content) or pattern.search(context):
|
||||
results.append({
|
||||
"source": f"memory/{category}",
|
||||
"date": item.get("date", "unknown"),
|
||||
"content": content,
|
||||
"context": context,
|
||||
"id": item.get("id", "")
|
||||
})
|
||||
|
||||
# Also search general-instructions.json
|
||||
gi_data = load_json(PA_DIR / "general-instructions.json")
|
||||
if gi_data and "instructions" in gi_data:
|
||||
for item in gi_data["instructions"]:
|
||||
if item.get("status") != "active":
|
||||
continue
|
||||
content = item.get("instruction", "")
|
||||
if pattern.search(content):
|
||||
results.append({
|
||||
"source": "memory/general-instructions",
|
||||
"date": item.get("created", "unknown"),
|
||||
"content": content,
|
||||
"context": "",
|
||||
"id": item.get("id", "")
|
||||
})
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def search_history(query: str, case_insensitive: bool = True) -> List[Dict]:
|
||||
"""Search through session history."""
|
||||
results = []
|
||||
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
|
||||
|
||||
index_path = HISTORY_DIR / "index.json"
|
||||
index_data = load_json(index_path)
|
||||
|
||||
if not index_data or "sessions" not in index_data:
|
||||
return results
|
||||
|
||||
for session in index_data["sessions"]:
|
||||
session_id = session.get("id", "")
|
||||
summary = session.get("summary", "")
|
||||
topics = session.get("topics", [])
|
||||
date = session.get("date", "unknown")
|
||||
|
||||
# Search in summary and topics
|
||||
topics_str = " ".join(topics) if topics else ""
|
||||
if pattern.search(summary) or pattern.search(topics_str):
|
||||
results.append({
|
||||
"source": "history",
|
||||
"date": date,
|
||||
"session_id": session_id,
|
||||
"content": summary,
|
||||
"topics": topics
|
||||
})
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def search_config(query: str, case_insensitive: bool = True) -> List[Dict]:
|
||||
"""Search through configuration files."""
|
||||
results = []
|
||||
pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0)
|
||||
|
||||
config_files = [
|
||||
STATE_DIR / "component-registry.json",
|
||||
STATE_DIR / "system-instructions.json",
|
||||
STATE_DIR / "autonomy-levels.json",
|
||||
STATE_DIR / "model-policy.json",
|
||||
STATE_DIR / "kb.json",
|
||||
PA_DIR / "kb.json",
|
||||
]
|
||||
|
||||
for config_path in config_files:
|
||||
if not config_path.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
content = config_path.read_text()
|
||||
if pattern.search(content):
|
||||
# Find matching lines
|
||||
matches = []
|
||||
for i, line in enumerate(content.split('\n'), 1):
|
||||
if pattern.search(line):
|
||||
matches.append(f"L{i}: {line.strip()[:100]}")
|
||||
|
||||
results.append({
|
||||
"source": f"config/{config_path.name}",
|
||||
"matches": matches[:5], # Limit to 5 matches per file
|
||||
"total_matches": len(matches)
|
||||
})
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def get_recent_items(days: int = 7) -> List[Dict]:
|
||||
"""Get items from the last N days."""
|
||||
results = []
|
||||
cutoff = datetime.now() - timedelta(days=days)
|
||||
|
||||
# Check memory files
|
||||
memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"]
|
||||
|
||||
for filename in memory_files:
|
||||
data = load_json(MEMORY_DIR / filename)
|
||||
if not data or "items" not in data:
|
||||
continue
|
||||
|
||||
category = filename.replace(".json", "")
|
||||
for item in data["items"]:
|
||||
date_str = item.get("date", "")
|
||||
try:
|
||||
item_date = datetime.strptime(date_str, "%Y-%m-%d")
|
||||
if item_date >= cutoff:
|
||||
results.append({
|
||||
"source": f"memory/{category}",
|
||||
"date": date_str,
|
||||
"content": item.get("content", ""),
|
||||
"type": "memory"
|
||||
})
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# Check history
|
||||
index_data = load_json(HISTORY_DIR / "index.json")
|
||||
if index_data and "sessions" in index_data:
|
||||
for session in index_data["sessions"]:
|
||||
date_str = session.get("date", "")
|
||||
try:
|
||||
session_date = datetime.strptime(date_str[:10], "%Y-%m-%d")
|
||||
if session_date >= cutoff:
|
||||
results.append({
|
||||
"source": "history",
|
||||
"date": date_str,
|
||||
"content": session.get("summary", "No summary"),
|
||||
"type": "session",
|
||||
"session_id": session.get("id", "")
|
||||
})
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# Sort by date, newest first
|
||||
results.sort(key=lambda x: x.get("date", ""), reverse=True)
|
||||
return results
|
||||
|
||||
|
||||
def format_results(results: List[Dict], search_type: str) -> str:
|
||||
"""Format search results for display."""
|
||||
if not results:
|
||||
return f"No results found in {search_type}.\n"
|
||||
|
||||
lines = [f"\n=== {search_type.title()} Results ({len(results)}) ===\n"]
|
||||
|
||||
for r in results:
|
||||
source = r.get("source", "unknown")
|
||||
date = r.get("date", "")
|
||||
|
||||
if "matches" in r:
|
||||
# Config result
|
||||
lines.append(f"📄 {source}")
|
||||
lines.append(f" {r.get('total_matches', 0)} matches found:")
|
||||
for match in r.get("matches", []):
|
||||
lines.append(f" {match}")
|
||||
elif "session_id" in r:
|
||||
# History result
|
||||
lines.append(f"📜 {date[:10]} - Session: {r.get('session_id', '')[:8]}...")
|
||||
lines.append(f" {r.get('content', '')[:200]}")
|
||||
if r.get("topics"):
|
||||
lines.append(f" Topics: {', '.join(r['topics'][:5])}")
|
||||
else:
|
||||
# Memory result
|
||||
lines.append(f"💾 [{source}] {date}")
|
||||
lines.append(f" {r.get('content', '')[:200]}")
|
||||
if r.get("context"):
|
||||
lines.append(f" Context: {r.get('context', '')[:100]}")
|
||||
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Search PA memory, history, and configuration"
|
||||
)
|
||||
parser.add_argument("query", nargs="*", help="Search query")
|
||||
parser.add_argument("--memory", action="store_true", help="Search only memory")
|
||||
parser.add_argument("--history", action="store_true", help="Search only history")
|
||||
parser.add_argument("--config", action="store_true", help="Search only config")
|
||||
parser.add_argument("--recent", type=int, nargs="?", const=7,
|
||||
help="Show recent items (default: 7 days)")
|
||||
parser.add_argument("-i", "--case-insensitive", action="store_true", default=True,
|
||||
help="Case insensitive search (default)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Handle --recent
|
||||
if args.recent is not None:
|
||||
results = get_recent_items(args.recent)
|
||||
print(f"\n=== Items from last {args.recent} days ({len(results)}) ===\n")
|
||||
for r in results:
|
||||
print(f"[{r['date'][:10]}] {r['source']}: {r['content'][:100]}...")
|
||||
return 0
|
||||
|
||||
# Need query for search
|
||||
if not args.query:
|
||||
parser.print_help()
|
||||
return 1
|
||||
|
||||
query = " ".join(args.query)
|
||||
|
||||
# Determine what to search
|
||||
search_all = not (args.memory or args.history or args.config)
|
||||
|
||||
output = []
|
||||
|
||||
if search_all or args.memory:
|
||||
results = search_memory(query)
|
||||
output.append(format_results(results, "memory"))
|
||||
|
||||
if search_all or args.history:
|
||||
results = search_history(query)
|
||||
output.append(format_results(results, "history"))
|
||||
|
||||
if search_all or args.config:
|
||||
results = search_config(query)
|
||||
output.append(format_results(results, "config"))
|
||||
|
||||
print(f"\n🔍 Search: '{query}'")
|
||||
print("".join(output))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user