Files
claude-code/skills/morning-report/scripts/generate.py
OpenCode Test 45b7e4bcf7 Improve morning report collectors and add section toggling
- Add is_section_enabled() to support per-section enable/disable in config
- Update Python path from 3.13 to 3.14 for gmail venv
- Disable tasks section by default (enabled: false in config)
- Apply code formatting improvements (black/ruff style)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-04 23:44:24 -08:00

232 lines
6.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""Morning report orchestrator - generates the daily dashboard."""
import json
import logging
import os
import sys
import shutil
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
# Add collectors to path
sys.path.insert(0, str(Path(__file__).parent))
from collectors import weather, stocks, infra, news
# These may fail if gmail venv not activated
try:
from collectors import gmail, gcal, gtasks
GOOGLE_COLLECTORS = True
except ImportError:
GOOGLE_COLLECTORS = False
# Setup logging
LOG_PATH = Path.home() / ".claude/logs/morning-report.log"
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler(LOG_PATH), logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
def load_config() -> dict:
"""Load configuration from config.json."""
config_path = Path(__file__).parent.parent / "config.json"
if config_path.exists():
return json.loads(config_path.read_text())
return {}
def collect_section(name: str, collector_func, config: dict) -> dict:
"""Run a collector and handle errors."""
try:
logger.info(f"Collecting {name}...")
result = collector_func(config)
logger.info(f"Collected {name}: OK")
return result
except Exception as e:
logger.error(f"Collector {name} failed: {e}")
return {
"section": name,
"icon": "",
"content": f"⚠️ {name} unavailable: {e}",
"error": str(e),
}
def is_section_enabled(name: str, config: dict) -> bool:
"""Check if a section is enabled in config."""
section_key = name.lower()
section_config = config.get(section_key, {})
return section_config.get("enabled", True)
def collect_all(config: dict) -> list:
"""Collect all sections in parallel."""
collectors = [
("Weather", weather.collect),
("Stocks", stocks.collect),
("Infra", infra.collect),
("News", news.collect),
]
if GOOGLE_COLLECTORS:
if is_section_enabled("email", config):
collectors.append(("Email", gmail.collect))
if is_section_enabled("calendar", config):
collectors.append(("Calendar", gcal.collect))
if is_section_enabled("tasks", config):
collectors.append(("Tasks", gtasks.collect))
else:
logger.warning("Google collectors not available - run with gmail venv")
results = []
with ThreadPoolExecutor(max_workers=6) as executor:
futures = {
executor.submit(collect_section, name, func, config): name
for name, func in collectors
}
for future in as_completed(futures):
name = futures[future]
try:
result = future.result()
results.append(result)
except Exception as e:
logger.error(f"Future {name} exception: {e}")
results.append(
{
"section": name,
"icon": "",
"content": f"⚠️ {name} failed: {e}",
"error": str(e),
}
)
return results
def render_report(sections: list, config: dict) -> str:
"""Render the markdown report."""
now = datetime.now()
date_str = now.strftime("%a %b %d, %Y")
time_str = now.strftime("%I:%M %p %Z").strip()
lines = [f"# Morning Report - {date_str}", ""]
# Order sections
order = [
"Weather",
"Email",
"Calendar",
"Today",
"Stocks",
"Tasks",
"Infra",
"Infrastructure",
"News",
"Tech News",
]
# Sort by order
section_map = {s.get("section", ""): s for s in sections}
for name in order:
if name in section_map:
s = section_map[name]
lines.append(f"## {s.get('icon', '📌')} {s.get('section', 'Unknown')}")
lines.append(s.get("content", "No data"))
lines.append("")
# Add any unordered sections
for s in sections:
if s.get("section") not in order:
lines.append(f"## {s.get('icon', '📌')} {s.get('section', 'Unknown')}")
lines.append(s.get("content", "No data"))
lines.append("")
# Footer
lines.extend(["---", f"*Generated: {now.strftime('%Y-%m-%d %H:%M:%S')} PT*"])
return "\n".join(lines)
def save_report(content: str, config: dict) -> Path:
"""Save report to file and archive."""
output_config = config.get("output", {})
output_path = Path(
output_config.get("path", "~/.claude/reports/morning.md")
).expanduser()
output_path.parent.mkdir(parents=True, exist_ok=True)
# Write main report
output_path.write_text(content)
logger.info(f"Report saved to {output_path}")
# Archive if enabled
if output_config.get("archive", True):
archive_dir = output_path.parent / "archive"
archive_dir.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%Y-%m-%d")
archive_path = archive_dir / f"{date_str}.md"
shutil.copy(output_path, archive_path)
logger.info(f"Archived to {archive_path}")
# Cleanup old archives
archive_days = output_config.get("archive_days", 30)
cleanup_archives(archive_dir, archive_days)
return output_path
def cleanup_archives(archive_dir: Path, max_days: int):
"""Remove archives older than max_days."""
from datetime import timedelta
cutoff = datetime.now() - timedelta(days=max_days)
for f in archive_dir.glob("*.md"):
try:
# Parse date from filename
date_str = f.stem
file_date = datetime.strptime(date_str, "%Y-%m-%d")
if file_date < cutoff:
f.unlink()
logger.info(f"Removed old archive: {f}")
except ValueError:
pass # Skip files that don't match date pattern
def main():
"""Main entry point."""
logger.info("=" * 50)
logger.info("Starting morning report generation")
config = load_config()
logger.info(f"Loaded config: {len(config)} sections")
sections = collect_all(config)
logger.info(f"Collected {len(sections)} sections")
report = render_report(sections, config)
output_path = save_report(report, config)
print(f"\n✅ Morning report generated: {output_path}")
print(f" View with: cat {output_path}")
logger.info("Morning report generation complete")
return 0
if __name__ == "__main__":
sys.exit(main())