#!/usr/bin/env python3 """Morning report orchestrator - generates the daily dashboard.""" import json import logging import os import sys import shutil from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path # Add collectors to path sys.path.insert(0, str(Path(__file__).parent)) from collectors import weather, stocks, infra, news # These may fail if gmail venv not activated try: from collectors import gmail, gcal, gtasks GOOGLE_COLLECTORS = True except ImportError: GOOGLE_COLLECTORS = False # Setup logging LOG_PATH = Path.home() / ".claude/logs/morning-report.log" LOG_PATH.parent.mkdir(parents=True, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler(LOG_PATH), logging.StreamHandler()], ) logger = logging.getLogger(__name__) def load_config() -> dict: """Load configuration from config.json.""" config_path = Path(__file__).parent.parent / "config.json" if config_path.exists(): return json.loads(config_path.read_text()) return {} def collect_section(name: str, collector_func, config: dict) -> dict: """Run a collector and handle errors.""" try: logger.info(f"Collecting {name}...") result = collector_func(config) logger.info(f"Collected {name}: OK") return result except Exception as e: logger.error(f"Collector {name} failed: {e}") return { "section": name, "icon": "❓", "content": f"⚠️ {name} unavailable: {e}", "error": str(e), } def is_section_enabled(name: str, config: dict) -> bool: """Check if a section is enabled in config.""" section_key = name.lower() section_config = config.get(section_key, {}) return section_config.get("enabled", True) def collect_all(config: dict) -> list: """Collect all sections in parallel.""" collectors = [ ("Weather", weather.collect), ("Stocks", stocks.collect), ("Infra", infra.collect), ("News", news.collect), ] if GOOGLE_COLLECTORS: if is_section_enabled("email", config): collectors.append(("Email", gmail.collect)) if is_section_enabled("calendar", config): collectors.append(("Calendar", gcal.collect)) if is_section_enabled("tasks", config): collectors.append(("Tasks", gtasks.collect)) else: logger.warning("Google collectors not available - run with gmail venv") results = [] with ThreadPoolExecutor(max_workers=6) as executor: futures = { executor.submit(collect_section, name, func, config): name for name, func in collectors } for future in as_completed(futures): name = futures[future] try: result = future.result() results.append(result) except Exception as e: logger.error(f"Future {name} exception: {e}") results.append( { "section": name, "icon": "❓", "content": f"⚠️ {name} failed: {e}", "error": str(e), } ) return results def render_report(sections: list, config: dict) -> str: """Render the markdown report.""" now = datetime.now() date_str = now.strftime("%a %b %d, %Y") time_str = now.strftime("%I:%M %p %Z").strip() lines = [f"# Morning Report - {date_str}", ""] # Order sections order = [ "Weather", "Email", "Calendar", "Today", "Stocks", "Tasks", "Infra", "Infrastructure", "News", "Tech News", ] # Sort by order section_map = {s.get("section", ""): s for s in sections} for name in order: if name in section_map: s = section_map[name] lines.append(f"## {s.get('icon', '📌')} {s.get('section', 'Unknown')}") lines.append(s.get("content", "No data")) lines.append("") # Add any unordered sections for s in sections: if s.get("section") not in order: lines.append(f"## {s.get('icon', '📌')} {s.get('section', 'Unknown')}") lines.append(s.get("content", "No data")) lines.append("") # Footer lines.extend(["---", f"*Generated: {now.strftime('%Y-%m-%d %H:%M:%S')} PT*"]) return "\n".join(lines) def save_report(content: str, config: dict) -> Path: """Save report to file and archive.""" output_config = config.get("output", {}) output_path = Path( output_config.get("path", "~/.claude/reports/morning.md") ).expanduser() output_path.parent.mkdir(parents=True, exist_ok=True) # Write main report output_path.write_text(content) logger.info(f"Report saved to {output_path}") # Archive if enabled if output_config.get("archive", True): archive_dir = output_path.parent / "archive" archive_dir.mkdir(parents=True, exist_ok=True) date_str = datetime.now().strftime("%Y-%m-%d") archive_path = archive_dir / f"{date_str}.md" shutil.copy(output_path, archive_path) logger.info(f"Archived to {archive_path}") # Cleanup old archives archive_days = output_config.get("archive_days", 30) cleanup_archives(archive_dir, archive_days) return output_path def cleanup_archives(archive_dir: Path, max_days: int): """Remove archives older than max_days.""" from datetime import timedelta cutoff = datetime.now() - timedelta(days=max_days) for f in archive_dir.glob("*.md"): try: # Parse date from filename date_str = f.stem file_date = datetime.strptime(date_str, "%Y-%m-%d") if file_date < cutoff: f.unlink() logger.info(f"Removed old archive: {f}") except ValueError: pass # Skip files that don't match date pattern def main(): """Main entry point.""" logger.info("=" * 50) logger.info("Starting morning report generation") config = load_config() logger.info(f"Loaded config: {len(config)} sections") sections = collect_all(config) logger.info(f"Collected {len(sections)} sections") report = render_report(sections, config) output_path = save_report(report, config) print(f"\n✅ Morning report generated: {output_path}") print(f" View with: cat {output_path}") logger.info("Morning report generation complete") return 0 if __name__ == "__main__": sys.exit(main())