#!/usr/bin/env python3 """ Search across PA memory, history, and configuration. Usage: python3 search.py [--memory|--history|--config|--recent [days]] """ import argparse import json import os import re import sys from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional CLAUDE_DIR = Path.home() / ".claude" STATE_DIR = CLAUDE_DIR / "state" PA_DIR = STATE_DIR / "personal-assistant" MEMORY_DIR = PA_DIR / "memory" HISTORY_DIR = PA_DIR / "history" def load_json(path: Path) -> Optional[Dict]: """Load JSON file safely.""" try: with open(path) as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return None def search_memory(query: str, case_insensitive: bool = True) -> List[Dict]: """Search through memory files.""" results = [] pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0) memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"] for filename in memory_files: data = load_json(MEMORY_DIR / filename) if not data or "items" not in data: continue category = filename.replace(".json", "") for item in data["items"]: content = item.get("content", "") context = item.get("context", "") if pattern.search(content) or pattern.search(context): results.append({ "source": f"memory/{category}", "date": item.get("date", "unknown"), "content": content, "context": context, "id": item.get("id", "") }) # Also search general-instructions.json gi_data = load_json(PA_DIR / "general-instructions.json") if gi_data and "instructions" in gi_data: for item in gi_data["instructions"]: if item.get("status") != "active": continue content = item.get("instruction", "") if pattern.search(content): results.append({ "source": "memory/general-instructions", "date": item.get("created", "unknown"), "content": content, "context": "", "id": item.get("id", "") }) return results def search_history(query: str, case_insensitive: bool = True) -> List[Dict]: """Search through session history.""" results = [] pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0) index_path = HISTORY_DIR / "index.json" index_data = load_json(index_path) if not index_data or "sessions" not in index_data: return results for session in index_data["sessions"]: session_id = session.get("id", "") summary = session.get("summary", "") topics = session.get("topics", []) date = session.get("date", "unknown") # Search in summary and topics topics_str = " ".join(topics) if topics else "" if pattern.search(summary) or pattern.search(topics_str): results.append({ "source": "history", "date": date, "session_id": session_id, "content": summary, "topics": topics }) return results def search_config(query: str, case_insensitive: bool = True) -> List[Dict]: """Search through configuration files.""" results = [] pattern = re.compile(query, re.IGNORECASE if case_insensitive else 0) config_files = [ STATE_DIR / "component-registry.json", STATE_DIR / "system-instructions.json", STATE_DIR / "autonomy-levels.json", STATE_DIR / "model-policy.json", STATE_DIR / "kb.json", PA_DIR / "kb.json", ] for config_path in config_files: if not config_path.exists(): continue try: content = config_path.read_text() if pattern.search(content): # Find matching lines matches = [] for i, line in enumerate(content.split('\n'), 1): if pattern.search(line): matches.append(f"L{i}: {line.strip()[:100]}") results.append({ "source": f"config/{config_path.name}", "matches": matches[:5], # Limit to 5 matches per file "total_matches": len(matches) }) except Exception: continue return results def get_recent_items(days: int = 7) -> List[Dict]: """Get items from the last N days.""" results = [] cutoff = datetime.now() - timedelta(days=days) # Check memory files memory_files = ["preferences.json", "decisions.json", "projects.json", "facts.json"] for filename in memory_files: data = load_json(MEMORY_DIR / filename) if not data or "items" not in data: continue category = filename.replace(".json", "") for item in data["items"]: date_str = item.get("date", "") try: item_date = datetime.strptime(date_str, "%Y-%m-%d") if item_date >= cutoff: results.append({ "source": f"memory/{category}", "date": date_str, "content": item.get("content", ""), "type": "memory" }) except ValueError: continue # Check history index_data = load_json(HISTORY_DIR / "index.json") if index_data and "sessions" in index_data: for session in index_data["sessions"]: date_str = session.get("date", "") try: session_date = datetime.strptime(date_str[:10], "%Y-%m-%d") if session_date >= cutoff: results.append({ "source": "history", "date": date_str, "content": session.get("summary", "No summary"), "type": "session", "session_id": session.get("id", "") }) except ValueError: continue # Sort by date, newest first results.sort(key=lambda x: x.get("date", ""), reverse=True) return results def format_results(results: List[Dict], search_type: str) -> str: """Format search results for display.""" if not results: return f"No results found in {search_type}.\n" lines = [f"\n=== {search_type.title()} Results ({len(results)}) ===\n"] for r in results: source = r.get("source", "unknown") date = r.get("date", "") if "matches" in r: # Config result lines.append(f"šŸ“„ {source}") lines.append(f" {r.get('total_matches', 0)} matches found:") for match in r.get("matches", []): lines.append(f" {match}") elif "session_id" in r: # History result lines.append(f"šŸ“œ {date[:10]} - Session: {r.get('session_id', '')[:8]}...") lines.append(f" {r.get('content', '')[:200]}") if r.get("topics"): lines.append(f" Topics: {', '.join(r['topics'][:5])}") else: # Memory result lines.append(f"šŸ’¾ [{source}] {date}") lines.append(f" {r.get('content', '')[:200]}") if r.get("context"): lines.append(f" Context: {r.get('context', '')[:100]}") lines.append("") return "\n".join(lines) def main(): parser = argparse.ArgumentParser( description="Search PA memory, history, and configuration" ) parser.add_argument("query", nargs="*", help="Search query") parser.add_argument("--memory", action="store_true", help="Search only memory") parser.add_argument("--history", action="store_true", help="Search only history") parser.add_argument("--config", action="store_true", help="Search only config") parser.add_argument("--recent", type=int, nargs="?", const=7, help="Show recent items (default: 7 days)") parser.add_argument("-i", "--case-insensitive", action="store_true", default=True, help="Case insensitive search (default)") args = parser.parse_args() # Handle --recent if args.recent is not None: results = get_recent_items(args.recent) print(f"\n=== Items from last {args.recent} days ({len(results)}) ===\n") for r in results: print(f"[{r['date'][:10]}] {r['source']}: {r['content'][:100]}...") return 0 # Need query for search if not args.query: parser.print_help() return 1 query = " ".join(args.query) # Determine what to search search_all = not (args.memory or args.history or args.config) output = [] if search_all or args.memory: results = search_memory(query) output.append(format_results(results, "memory")) if search_all or args.history: results = search_history(query) output.append(format_results(results, "history")) if search_all or args.config: results = search_config(query) output.append(format_results(results, "config")) print(f"\nšŸ” Search: '{query}'") print("".join(output)) return 0 if __name__ == "__main__": sys.exit(main())