#!/usr/bin/env python3 """ Google Calendar Delegation Helper Fetches calendar events via Google Calendar API, delegates analysis to appropriate model tier. Usage: gcal_delegate.py today gcal_delegate.py tomorrow gcal_delegate.py week gcal_delegate.py next gcal_delegate.py summary """ import sys import os import json import argparse import subprocess from datetime import datetime, timedelta, timezone from pathlib import Path from zoneinfo import ZoneInfo # Local timezone for display LOCAL_TZ = ZoneInfo('America/Los_Angeles') # OAuth setup - reuse Gmail credentials location CREDENTIALS_PATH = Path.home() / ".gmail-mcp" / "credentials.json" TOKEN_PATH = Path.home() / ".gmail-mcp" / "calendar_token.json" SCOPES = ['https://www.googleapis.com/auth/calendar.readonly'] # Claude CLI path CLAUDE_CLI = "/home/linuxbrew/.linuxbrew/bin/claude" def get_calendar_service(): """Authenticate and return Calendar API service.""" try: from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build except ImportError: print(json.dumps({ "error": "Missing Google API libraries. Install with: pip install google-auth-oauthlib google-api-python-client" })) sys.exit(1) creds = None # Load existing token if TOKEN_PATH.exists(): creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES) # Refresh or create new credentials if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: try: creds.refresh(Request()) except Exception: creds = None if not creds: if not CREDENTIALS_PATH.exists(): print(json.dumps({ "error": f"OAuth credentials not found at {CREDENTIALS_PATH}. Set up Google Cloud OAuth first." })) sys.exit(1) flow = InstalledAppFlow.from_client_secrets_file(str(CREDENTIALS_PATH), SCOPES) creds = flow.run_local_server(port=0) # Save token for future use with open(TOKEN_PATH, 'w') as token: token.write(creds.to_json()) return build('calendar', 'v3', credentials=creds) def fetch_events(start: datetime, end: datetime, max_results: int = 50) -> list[dict]: """Fetch calendar events in time range.""" service = get_calendar_service() events_result = service.events().list( calendarId='primary', timeMin=start.isoformat(), timeMax=end.isoformat(), maxResults=max_results, singleEvents=True, orderBy='startTime' ).execute() events = [] for event in events_result.get('items', []): # Parse start time start_raw = event['start'].get('dateTime', event['start'].get('date')) if 'T' in start_raw: start_dt = datetime.fromisoformat(start_raw.replace('Z', '+00:00')) start_local = start_dt.astimezone(LOCAL_TZ) all_day = False else: start_dt = datetime.strptime(start_raw, '%Y-%m-%d') start_local = start_dt # All-day events don't need TZ conversion all_day = True # Parse end time end_raw = event['end'].get('dateTime', event['end'].get('date')) if 'T' in end_raw: end_dt = datetime.fromisoformat(end_raw.replace('Z', '+00:00')) else: end_dt = datetime.strptime(end_raw, '%Y-%m-%d') # Calculate duration duration_mins = int((end_dt - start_dt).total_seconds() / 60) if not all_day else None # Extract location (could be URL or physical) location = event.get('location', '') hangout_link = event.get('hangoutLink', '') meeting_link = hangout_link or (location if location.startswith('http') else '') physical_location = location if location and not location.startswith('http') else '' # Attendee count attendees = event.get('attendees', []) attendee_count = len(attendees) events.append({ 'id': event['id'], 'title': event.get('summary', '(no title)'), 'start': start_raw, 'start_formatted': start_local.strftime('%I:%M %p').lstrip('0') if not all_day else 'All day', 'end': end_raw, 'duration_mins': duration_mins, 'all_day': all_day, 'location': physical_location, 'meeting_link': meeting_link, 'attendee_count': attendee_count, 'description': (event.get('description', '') or '')[:100], 'date': start_local.strftime('%Y-%m-%d'), 'day_name': start_local.strftime('%A'), }) return events def format_duration(mins: int) -> str: """Format duration in human-readable form.""" if mins is None: return "" if mins < 60: return f"({mins}m)" hours = mins // 60 remaining = mins % 60 if remaining == 0: return f"({hours}h)" return f"({hours}h {remaining}m)" def delegate(model: str, system: str, prompt: str, max_tokens: int = 4096) -> dict: """Delegate a task to Claude CLI using subscription.""" try: cmd = [ CLAUDE_CLI, "--print", "--model", model, "--system-prompt", system, "--output-format", "json", prompt ] result = subprocess.run( cmd, capture_output=True, text=True, timeout=120 ) if result.returncode != 0: return {"error": f"Claude CLI error: {result.stderr}"} try: output = json.loads(result.stdout) return { "success": True, "model": model, "content": output.get("result", ""), "usage": output.get("usage", {}) } except json.JSONDecodeError: return { "success": True, "model": model, "content": result.stdout.strip(), "usage": {} } except subprocess.TimeoutExpired: return {"error": "Claude CLI timed out"} except FileNotFoundError: return {"error": f"Claude CLI not found at {CLAUDE_CLI}"} except Exception as e: return {"error": f"Unexpected error: {e}"} def cmd_today() -> dict: """Get today's events.""" # Use local time to determine "today" now_local = datetime.now(LOCAL_TZ) start_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) end_local = start_local + timedelta(days=1) # Convert to UTC for API query start_utc = start_local.astimezone(timezone.utc) end_utc = end_local.astimezone(timezone.utc) events = fetch_events(start_utc, end_utc) return { "tier": "haiku", "operation": "today", "date": start_local.strftime('%Y-%m-%d'), "day_name": start_local.strftime('%A'), "display_date": start_local.strftime('%A, %b %d'), "events": events, "count": len(events) } def cmd_tomorrow() -> dict: """Get tomorrow's events.""" # Use local time to determine "tomorrow" now_local = datetime.now(LOCAL_TZ) start_local = (now_local + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) end_local = start_local + timedelta(days=1) # Convert to UTC for API query start_utc = start_local.astimezone(timezone.utc) end_utc = end_local.astimezone(timezone.utc) events = fetch_events(start_utc, end_utc) return { "tier": "haiku", "operation": "tomorrow", "date": start_local.strftime('%Y-%m-%d'), "day_name": start_local.strftime('%A'), "display_date": start_local.strftime('%A, %b %d'), "events": events, "count": len(events) } def cmd_week() -> dict: """Get next 7 days of events, grouped by day.""" # Use local time to determine the week now_local = datetime.now(LOCAL_TZ) start_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0) end_local = start_local + timedelta(days=7) # Convert to UTC for API query start_utc = start_local.astimezone(timezone.utc) end_utc = end_local.astimezone(timezone.utc) events = fetch_events(start_utc, end_utc, max_results=100) # Group by date (using local dates) by_day = {} for i in range(7): day = start_local + timedelta(days=i) day_str = day.strftime('%Y-%m-%d') by_day[day_str] = { "date": day_str, "day_name": day.strftime('%A'), "display_date": day.strftime('%A, %b %d'), "events": [] } for event in events: day_str = event['date'] if day_str in by_day: by_day[day_str]['events'].append(event) return { "tier": "haiku", "operation": "week", "start_date": start_local.strftime('%Y-%m-%d'), "end_date": end_local.strftime('%Y-%m-%d'), "display_range": f"{start_local.strftime('%b %d')} - {end_local.strftime('%b %d')}", "days": list(by_day.values()), "total_events": len(events) } def cmd_next() -> dict: """Get next upcoming event.""" now = datetime.now(timezone.utc) end = now + timedelta(days=7) # Look ahead 7 days max events = fetch_events(now, end, max_results=5) # Filter to events that haven't started yet upcoming = [e for e in events if e['start'] > now.isoformat()] if not upcoming: return { "tier": "haiku", "operation": "next", "event": None, "message": "No upcoming events in the next 7 days." } return { "tier": "haiku", "operation": "next", "event": upcoming[0] } def cmd_summary() -> dict: """Get smart summary of the week using Sonnet.""" # First fetch the week's events week_data = cmd_week() if week_data['total_events'] == 0: return { "tier": "haiku", "operation": "summary", "summary": "Your week is clear — no events scheduled." } # Build context for Sonnet context = f"Calendar events for {week_data['display_range']}:\n\n" for day in week_data['days']: context += f"--- {day['display_date']} ---\n" if not day['events']: context += "(no events)\n" else: for event in day['events']: context += f" {event['start_formatted']}: {event['title']}" if event['duration_mins']: context += f" {format_duration(event['duration_mins'])}" if event['attendee_count'] > 1: context += f" [{event['attendee_count']} attendees]" context += "\n" context += "\n" system = """You are a calendar analysis assistant. Provide concise, actionable summaries. Focus on: busy vs free periods, meeting load, potential conflicts, and strategic insights. Be brief — 2-4 sentences max. Highlight what's important.""" prompt = f"""Analyze this calendar and provide a brief summary: {context} Key points: overall busyness, best times for deep work, anything notable.""" result = delegate("sonnet", system, prompt, max_tokens=512) return { "tier": "sonnet", "operation": "summary", "total_events": week_data['total_events'], "summary": result.get("content", result.get("error", "Failed to analyze")), "usage": result.get("usage", {}) } def smart_default() -> str: """Return default command based on time of day.""" hour = datetime.now().hour return "today" if hour < 18 else "tomorrow" def main(): parser = argparse.ArgumentParser(description="Google Calendar operations with tiered delegation") subparsers = parser.add_subparsers(dest="command") # Subcommands subparsers.add_parser("today", help="Today's agenda (Haiku tier)") subparsers.add_parser("tomorrow", help="Tomorrow's agenda (Haiku tier)") subparsers.add_parser("week", help="Next 7 days (Haiku tier)") subparsers.add_parser("next", help="Next upcoming event (Haiku tier)") subparsers.add_parser("summary", help="Smart week summary (Sonnet tier)") args = parser.parse_args() # Smart default if no command command = args.command or smart_default() try: if command == "today": result = cmd_today() elif command == "tomorrow": result = cmd_tomorrow() elif command == "week": result = cmd_week() elif command == "next": result = cmd_next() elif command == "summary": result = cmd_summary() else: result = {"error": f"Unknown command: {command}"} print(json.dumps(result, indent=2, default=str)) except Exception as e: print(json.dumps({"error": str(e)})) sys.exit(1) if __name__ == "__main__": main()