- Query using local "today/tomorrow/week" boundaries converted to UTC - Display event times converted to America/Los_Angeles timezone - Headers show local dates (Dec 31, 2025 instead of Jan 1, 2026) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
412 lines
13 KiB
Python
Executable File
412 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Google Calendar Delegation Helper
|
|
|
|
Fetches calendar events via Google Calendar API, delegates analysis to appropriate model tier.
|
|
|
|
Usage:
|
|
gcal_delegate.py today
|
|
gcal_delegate.py tomorrow
|
|
gcal_delegate.py week
|
|
gcal_delegate.py next
|
|
gcal_delegate.py summary
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import json
|
|
import argparse
|
|
import subprocess
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
# Local timezone for display
|
|
LOCAL_TZ = ZoneInfo('America/Los_Angeles')
|
|
|
|
# OAuth setup - reuse Gmail credentials location
|
|
CREDENTIALS_PATH = Path.home() / ".gmail-mcp" / "credentials.json"
|
|
TOKEN_PATH = Path.home() / ".gmail-mcp" / "calendar_token.json"
|
|
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
|
|
|
|
# Claude CLI path
|
|
CLAUDE_CLI = "/home/linuxbrew/.linuxbrew/bin/claude"
|
|
|
|
|
|
def get_calendar_service():
|
|
"""Authenticate and return Calendar API service."""
|
|
try:
|
|
from google.oauth2.credentials import Credentials
|
|
from google.auth.transport.requests import Request
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from googleapiclient.discovery import build
|
|
except ImportError:
|
|
print(json.dumps({
|
|
"error": "Missing Google API libraries. Install with: pip install google-auth-oauthlib google-api-python-client"
|
|
}))
|
|
sys.exit(1)
|
|
|
|
creds = None
|
|
|
|
# Load existing token
|
|
if TOKEN_PATH.exists():
|
|
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
|
|
|
|
# Refresh or create new credentials
|
|
if not creds or not creds.valid:
|
|
if creds and creds.expired and creds.refresh_token:
|
|
try:
|
|
creds.refresh(Request())
|
|
except Exception:
|
|
creds = None
|
|
|
|
if not creds:
|
|
if not CREDENTIALS_PATH.exists():
|
|
print(json.dumps({
|
|
"error": f"OAuth credentials not found at {CREDENTIALS_PATH}. Set up Google Cloud OAuth first."
|
|
}))
|
|
sys.exit(1)
|
|
|
|
flow = InstalledAppFlow.from_client_secrets_file(str(CREDENTIALS_PATH), SCOPES)
|
|
creds = flow.run_local_server(port=0)
|
|
|
|
# Save token for future use
|
|
with open(TOKEN_PATH, 'w') as token:
|
|
token.write(creds.to_json())
|
|
|
|
return build('calendar', 'v3', credentials=creds)
|
|
|
|
|
|
def fetch_events(start: datetime, end: datetime, max_results: int = 50) -> list[dict]:
|
|
"""Fetch calendar events in time range."""
|
|
service = get_calendar_service()
|
|
|
|
events_result = service.events().list(
|
|
calendarId='primary',
|
|
timeMin=start.isoformat(),
|
|
timeMax=end.isoformat(),
|
|
maxResults=max_results,
|
|
singleEvents=True,
|
|
orderBy='startTime'
|
|
).execute()
|
|
|
|
events = []
|
|
for event in events_result.get('items', []):
|
|
# Parse start time
|
|
start_raw = event['start'].get('dateTime', event['start'].get('date'))
|
|
if 'T' in start_raw:
|
|
start_dt = datetime.fromisoformat(start_raw.replace('Z', '+00:00'))
|
|
start_local = start_dt.astimezone(LOCAL_TZ)
|
|
all_day = False
|
|
else:
|
|
start_dt = datetime.strptime(start_raw, '%Y-%m-%d')
|
|
start_local = start_dt # All-day events don't need TZ conversion
|
|
all_day = True
|
|
|
|
# Parse end time
|
|
end_raw = event['end'].get('dateTime', event['end'].get('date'))
|
|
if 'T' in end_raw:
|
|
end_dt = datetime.fromisoformat(end_raw.replace('Z', '+00:00'))
|
|
else:
|
|
end_dt = datetime.strptime(end_raw, '%Y-%m-%d')
|
|
|
|
# Calculate duration
|
|
duration_mins = int((end_dt - start_dt).total_seconds() / 60) if not all_day else None
|
|
|
|
# Extract location (could be URL or physical)
|
|
location = event.get('location', '')
|
|
hangout_link = event.get('hangoutLink', '')
|
|
meeting_link = hangout_link or (location if location.startswith('http') else '')
|
|
physical_location = location if location and not location.startswith('http') else ''
|
|
|
|
# Attendee count
|
|
attendees = event.get('attendees', [])
|
|
attendee_count = len(attendees)
|
|
|
|
events.append({
|
|
'id': event['id'],
|
|
'title': event.get('summary', '(no title)'),
|
|
'start': start_raw,
|
|
'start_formatted': start_local.strftime('%I:%M %p').lstrip('0') if not all_day else 'All day',
|
|
'end': end_raw,
|
|
'duration_mins': duration_mins,
|
|
'all_day': all_day,
|
|
'location': physical_location,
|
|
'meeting_link': meeting_link,
|
|
'attendee_count': attendee_count,
|
|
'description': (event.get('description', '') or '')[:100],
|
|
'date': start_local.strftime('%Y-%m-%d'),
|
|
'day_name': start_local.strftime('%A'),
|
|
})
|
|
|
|
return events
|
|
|
|
|
|
def format_duration(mins: int) -> str:
|
|
"""Format duration in human-readable form."""
|
|
if mins is None:
|
|
return ""
|
|
if mins < 60:
|
|
return f"({mins}m)"
|
|
hours = mins // 60
|
|
remaining = mins % 60
|
|
if remaining == 0:
|
|
return f"({hours}h)"
|
|
return f"({hours}h {remaining}m)"
|
|
|
|
|
|
def delegate(model: str, system: str, prompt: str, max_tokens: int = 4096) -> dict:
|
|
"""Delegate a task to Claude CLI using subscription."""
|
|
try:
|
|
cmd = [
|
|
CLAUDE_CLI,
|
|
"--print",
|
|
"--model", model,
|
|
"--system-prompt", system,
|
|
"--output-format", "json",
|
|
prompt
|
|
]
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=120
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"error": f"Claude CLI error: {result.stderr}"}
|
|
|
|
try:
|
|
output = json.loads(result.stdout)
|
|
return {
|
|
"success": True,
|
|
"model": model,
|
|
"content": output.get("result", ""),
|
|
"usage": output.get("usage", {})
|
|
}
|
|
except json.JSONDecodeError:
|
|
return {
|
|
"success": True,
|
|
"model": model,
|
|
"content": result.stdout.strip(),
|
|
"usage": {}
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {"error": "Claude CLI timed out"}
|
|
except FileNotFoundError:
|
|
return {"error": f"Claude CLI not found at {CLAUDE_CLI}"}
|
|
except Exception as e:
|
|
return {"error": f"Unexpected error: {e}"}
|
|
|
|
|
|
def cmd_today() -> dict:
|
|
"""Get today's events."""
|
|
# Use local time to determine "today"
|
|
now_local = datetime.now(LOCAL_TZ)
|
|
start_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end_local = start_local + timedelta(days=1)
|
|
|
|
# Convert to UTC for API query
|
|
start_utc = start_local.astimezone(timezone.utc)
|
|
end_utc = end_local.astimezone(timezone.utc)
|
|
|
|
events = fetch_events(start_utc, end_utc)
|
|
|
|
return {
|
|
"tier": "haiku",
|
|
"operation": "today",
|
|
"date": start_local.strftime('%Y-%m-%d'),
|
|
"day_name": start_local.strftime('%A'),
|
|
"display_date": start_local.strftime('%A, %b %d'),
|
|
"events": events,
|
|
"count": len(events)
|
|
}
|
|
|
|
|
|
def cmd_tomorrow() -> dict:
|
|
"""Get tomorrow's events."""
|
|
# Use local time to determine "tomorrow"
|
|
now_local = datetime.now(LOCAL_TZ)
|
|
start_local = (now_local + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end_local = start_local + timedelta(days=1)
|
|
|
|
# Convert to UTC for API query
|
|
start_utc = start_local.astimezone(timezone.utc)
|
|
end_utc = end_local.astimezone(timezone.utc)
|
|
|
|
events = fetch_events(start_utc, end_utc)
|
|
|
|
return {
|
|
"tier": "haiku",
|
|
"operation": "tomorrow",
|
|
"date": start_local.strftime('%Y-%m-%d'),
|
|
"day_name": start_local.strftime('%A'),
|
|
"display_date": start_local.strftime('%A, %b %d'),
|
|
"events": events,
|
|
"count": len(events)
|
|
}
|
|
|
|
|
|
def cmd_week() -> dict:
|
|
"""Get next 7 days of events, grouped by day."""
|
|
# Use local time to determine the week
|
|
now_local = datetime.now(LOCAL_TZ)
|
|
start_local = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
end_local = start_local + timedelta(days=7)
|
|
|
|
# Convert to UTC for API query
|
|
start_utc = start_local.astimezone(timezone.utc)
|
|
end_utc = end_local.astimezone(timezone.utc)
|
|
|
|
events = fetch_events(start_utc, end_utc, max_results=100)
|
|
|
|
# Group by date (using local dates)
|
|
by_day = {}
|
|
for i in range(7):
|
|
day = start_local + timedelta(days=i)
|
|
day_str = day.strftime('%Y-%m-%d')
|
|
by_day[day_str] = {
|
|
"date": day_str,
|
|
"day_name": day.strftime('%A'),
|
|
"display_date": day.strftime('%A, %b %d'),
|
|
"events": []
|
|
}
|
|
|
|
for event in events:
|
|
day_str = event['date']
|
|
if day_str in by_day:
|
|
by_day[day_str]['events'].append(event)
|
|
|
|
return {
|
|
"tier": "haiku",
|
|
"operation": "week",
|
|
"start_date": start_local.strftime('%Y-%m-%d'),
|
|
"end_date": end_local.strftime('%Y-%m-%d'),
|
|
"display_range": f"{start_local.strftime('%b %d')} - {end_local.strftime('%b %d')}",
|
|
"days": list(by_day.values()),
|
|
"total_events": len(events)
|
|
}
|
|
|
|
|
|
def cmd_next() -> dict:
|
|
"""Get next upcoming event."""
|
|
now = datetime.now(timezone.utc)
|
|
end = now + timedelta(days=7) # Look ahead 7 days max
|
|
|
|
events = fetch_events(now, end, max_results=5)
|
|
|
|
# Filter to events that haven't started yet
|
|
upcoming = [e for e in events if e['start'] > now.isoformat()]
|
|
|
|
if not upcoming:
|
|
return {
|
|
"tier": "haiku",
|
|
"operation": "next",
|
|
"event": None,
|
|
"message": "No upcoming events in the next 7 days."
|
|
}
|
|
|
|
return {
|
|
"tier": "haiku",
|
|
"operation": "next",
|
|
"event": upcoming[0]
|
|
}
|
|
|
|
|
|
def cmd_summary() -> dict:
|
|
"""Get smart summary of the week using Sonnet."""
|
|
# First fetch the week's events
|
|
week_data = cmd_week()
|
|
|
|
if week_data['total_events'] == 0:
|
|
return {
|
|
"tier": "haiku",
|
|
"operation": "summary",
|
|
"summary": "Your week is clear — no events scheduled."
|
|
}
|
|
|
|
# Build context for Sonnet
|
|
context = f"Calendar events for {week_data['display_range']}:\n\n"
|
|
for day in week_data['days']:
|
|
context += f"--- {day['display_date']} ---\n"
|
|
if not day['events']:
|
|
context += "(no events)\n"
|
|
else:
|
|
for event in day['events']:
|
|
context += f" {event['start_formatted']}: {event['title']}"
|
|
if event['duration_mins']:
|
|
context += f" {format_duration(event['duration_mins'])}"
|
|
if event['attendee_count'] > 1:
|
|
context += f" [{event['attendee_count']} attendees]"
|
|
context += "\n"
|
|
context += "\n"
|
|
|
|
system = """You are a calendar analysis assistant. Provide concise, actionable summaries.
|
|
Focus on: busy vs free periods, meeting load, potential conflicts, and strategic insights.
|
|
Be brief — 2-4 sentences max. Highlight what's important."""
|
|
|
|
prompt = f"""Analyze this calendar and provide a brief summary:
|
|
|
|
{context}
|
|
|
|
Key points: overall busyness, best times for deep work, anything notable."""
|
|
|
|
result = delegate("sonnet", system, prompt, max_tokens=512)
|
|
|
|
return {
|
|
"tier": "sonnet",
|
|
"operation": "summary",
|
|
"total_events": week_data['total_events'],
|
|
"summary": result.get("content", result.get("error", "Failed to analyze")),
|
|
"usage": result.get("usage", {})
|
|
}
|
|
|
|
|
|
def smart_default() -> str:
|
|
"""Return default command based on time of day."""
|
|
hour = datetime.now().hour
|
|
return "today" if hour < 18 else "tomorrow"
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Google Calendar operations with tiered delegation")
|
|
subparsers = parser.add_subparsers(dest="command")
|
|
|
|
# Subcommands
|
|
subparsers.add_parser("today", help="Today's agenda (Haiku tier)")
|
|
subparsers.add_parser("tomorrow", help="Tomorrow's agenda (Haiku tier)")
|
|
subparsers.add_parser("week", help="Next 7 days (Haiku tier)")
|
|
subparsers.add_parser("next", help="Next upcoming event (Haiku tier)")
|
|
subparsers.add_parser("summary", help="Smart week summary (Sonnet tier)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Smart default if no command
|
|
command = args.command or smart_default()
|
|
|
|
try:
|
|
if command == "today":
|
|
result = cmd_today()
|
|
elif command == "tomorrow":
|
|
result = cmd_tomorrow()
|
|
elif command == "week":
|
|
result = cmd_week()
|
|
elif command == "next":
|
|
result = cmd_next()
|
|
elif command == "summary":
|
|
result = cmd_summary()
|
|
else:
|
|
result = {"error": f"Unknown command: {command}"}
|
|
|
|
print(json.dumps(result, indent=2, default=str))
|
|
|
|
except Exception as e:
|
|
print(json.dumps({"error": str(e)}))
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|