Implement /gcal Google Calendar integration

Components:
- commands/gcal.md: Slash command with aliases (calendar, cal)
- skills/gcal/SKILL.md: Usage patterns, routing logic, output formats
- mcp/delegation/gcal_delegate.py: Python API wrapper with tiered delegation

Features:
- Subcommands: today, tomorrow, week, next, summary
- Smart default (today before 6pm, tomorrow after)
- Hybrid interface (subcommands + natural language)
- Haiku tier for fetch/format, Sonnet tier for analysis

Requires OAuth setup: enable Calendar API and authorize.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
OpenCode Test
2025-12-31 22:12:28 -08:00
parent aacaf57540
commit 5e03b4a9c1
3 changed files with 560 additions and 0 deletions

390
mcp/delegation/gcal_delegate.py Executable file
View File

@@ -0,0 +1,390 @@
#!/usr/bin/env python3
"""
Google Calendar Delegation Helper
Fetches calendar events via Google Calendar API, delegates analysis to appropriate model tier.
Usage:
gcal_delegate.py today
gcal_delegate.py tomorrow
gcal_delegate.py week
gcal_delegate.py next
gcal_delegate.py summary
"""
import sys
import os
import json
import argparse
import subprocess
from datetime import datetime, timedelta, timezone
from pathlib import Path
# OAuth setup - reuse Gmail credentials location
CREDENTIALS_PATH = Path.home() / ".gmail-mcp" / "credentials.json"
TOKEN_PATH = Path.home() / ".gmail-mcp" / "calendar_token.json"
SCOPES = ['https://www.googleapis.com/auth/calendar.readonly']
# Claude CLI path
CLAUDE_CLI = "/home/linuxbrew/.linuxbrew/bin/claude"
def get_calendar_service():
"""Authenticate and return Calendar API service."""
try:
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
except ImportError:
print(json.dumps({
"error": "Missing Google API libraries. Install with: pip install google-auth-oauthlib google-api-python-client"
}))
sys.exit(1)
creds = None
# Load existing token
if TOKEN_PATH.exists():
creds = Credentials.from_authorized_user_file(str(TOKEN_PATH), SCOPES)
# Refresh or create new credentials
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except Exception:
creds = None
if not creds:
if not CREDENTIALS_PATH.exists():
print(json.dumps({
"error": f"OAuth credentials not found at {CREDENTIALS_PATH}. Set up Google Cloud OAuth first."
}))
sys.exit(1)
flow = InstalledAppFlow.from_client_secrets_file(str(CREDENTIALS_PATH), SCOPES)
creds = flow.run_local_server(port=0)
# Save token for future use
with open(TOKEN_PATH, 'w') as token:
token.write(creds.to_json())
return build('calendar', 'v3', credentials=creds)
def fetch_events(start: datetime, end: datetime, max_results: int = 50) -> list[dict]:
"""Fetch calendar events in time range."""
service = get_calendar_service()
events_result = service.events().list(
calendarId='primary',
timeMin=start.isoformat() + 'Z',
timeMax=end.isoformat() + 'Z',
maxResults=max_results,
singleEvents=True,
orderBy='startTime'
).execute()
events = []
for event in events_result.get('items', []):
# Parse start time
start_raw = event['start'].get('dateTime', event['start'].get('date'))
if 'T' in start_raw:
start_dt = datetime.fromisoformat(start_raw.replace('Z', '+00:00'))
all_day = False
else:
start_dt = datetime.strptime(start_raw, '%Y-%m-%d')
all_day = True
# Parse end time
end_raw = event['end'].get('dateTime', event['end'].get('date'))
if 'T' in end_raw:
end_dt = datetime.fromisoformat(end_raw.replace('Z', '+00:00'))
else:
end_dt = datetime.strptime(end_raw, '%Y-%m-%d')
# Calculate duration
duration_mins = int((end_dt - start_dt).total_seconds() / 60) if not all_day else None
# Extract location (could be URL or physical)
location = event.get('location', '')
hangout_link = event.get('hangoutLink', '')
meeting_link = hangout_link or (location if location.startswith('http') else '')
physical_location = location if location and not location.startswith('http') else ''
# Attendee count
attendees = event.get('attendees', [])
attendee_count = len(attendees)
events.append({
'id': event['id'],
'title': event.get('summary', '(no title)'),
'start': start_raw,
'start_formatted': start_dt.strftime('%I:%M %p').lstrip('0') if not all_day else 'All day',
'end': end_raw,
'duration_mins': duration_mins,
'all_day': all_day,
'location': physical_location,
'meeting_link': meeting_link,
'attendee_count': attendee_count,
'description': (event.get('description', '') or '')[:100],
'date': start_dt.strftime('%Y-%m-%d'),
'day_name': start_dt.strftime('%A'),
})
return events
def format_duration(mins: int) -> str:
"""Format duration in human-readable form."""
if mins is None:
return ""
if mins < 60:
return f"({mins}m)"
hours = mins // 60
remaining = mins % 60
if remaining == 0:
return f"({hours}h)"
return f"({hours}h {remaining}m)"
def delegate(model: str, system: str, prompt: str, max_tokens: int = 4096) -> dict:
"""Delegate a task to Claude CLI using subscription."""
try:
cmd = [
CLAUDE_CLI,
"--print",
"--model", model,
"--system-prompt", system,
"--output-format", "json",
prompt
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0:
return {"error": f"Claude CLI error: {result.stderr}"}
try:
output = json.loads(result.stdout)
return {
"success": True,
"model": model,
"content": output.get("result", ""),
"usage": output.get("usage", {})
}
except json.JSONDecodeError:
return {
"success": True,
"model": model,
"content": result.stdout.strip(),
"usage": {}
}
except subprocess.TimeoutExpired:
return {"error": "Claude CLI timed out"}
except FileNotFoundError:
return {"error": f"Claude CLI not found at {CLAUDE_CLI}"}
except Exception as e:
return {"error": f"Unexpected error: {e}"}
def cmd_today() -> dict:
"""Get today's events."""
now = datetime.now(timezone.utc)
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
events = fetch_events(start, end)
return {
"tier": "haiku",
"operation": "today",
"date": start.strftime('%Y-%m-%d'),
"day_name": start.strftime('%A'),
"display_date": start.strftime('%A, %b %d'),
"events": events,
"count": len(events)
}
def cmd_tomorrow() -> dict:
"""Get tomorrow's events."""
now = datetime.now(timezone.utc)
start = (now + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=1)
events = fetch_events(start, end)
return {
"tier": "haiku",
"operation": "tomorrow",
"date": start.strftime('%Y-%m-%d'),
"day_name": start.strftime('%A'),
"display_date": start.strftime('%A, %b %d'),
"events": events,
"count": len(events)
}
def cmd_week() -> dict:
"""Get next 7 days of events, grouped by day."""
now = datetime.now(timezone.utc)
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
end = start + timedelta(days=7)
events = fetch_events(start, end, max_results=100)
# Group by date
by_day = {}
for i in range(7):
day = start + timedelta(days=i)
day_str = day.strftime('%Y-%m-%d')
by_day[day_str] = {
"date": day_str,
"day_name": day.strftime('%A'),
"display_date": day.strftime('%A, %b %d'),
"events": []
}
for event in events:
day_str = event['date']
if day_str in by_day:
by_day[day_str]['events'].append(event)
return {
"tier": "haiku",
"operation": "week",
"start_date": start.strftime('%Y-%m-%d'),
"end_date": end.strftime('%Y-%m-%d'),
"display_range": f"{start.strftime('%b %d')} - {end.strftime('%b %d')}",
"days": list(by_day.values()),
"total_events": len(events)
}
def cmd_next() -> dict:
"""Get next upcoming event."""
now = datetime.now(timezone.utc)
end = now + timedelta(days=7) # Look ahead 7 days max
events = fetch_events(now, end, max_results=5)
# Filter to events that haven't started yet
upcoming = [e for e in events if e['start'] > now.isoformat()]
if not upcoming:
return {
"tier": "haiku",
"operation": "next",
"event": None,
"message": "No upcoming events in the next 7 days."
}
return {
"tier": "haiku",
"operation": "next",
"event": upcoming[0]
}
def cmd_summary() -> dict:
"""Get smart summary of the week using Sonnet."""
# First fetch the week's events
week_data = cmd_week()
if week_data['total_events'] == 0:
return {
"tier": "haiku",
"operation": "summary",
"summary": "Your week is clear — no events scheduled."
}
# Build context for Sonnet
context = f"Calendar events for {week_data['display_range']}:\n\n"
for day in week_data['days']:
context += f"--- {day['display_date']} ---\n"
if not day['events']:
context += "(no events)\n"
else:
for event in day['events']:
context += f" {event['start_formatted']}: {event['title']}"
if event['duration_mins']:
context += f" {format_duration(event['duration_mins'])}"
if event['attendee_count'] > 1:
context += f" [{event['attendee_count']} attendees]"
context += "\n"
context += "\n"
system = """You are a calendar analysis assistant. Provide concise, actionable summaries.
Focus on: busy vs free periods, meeting load, potential conflicts, and strategic insights.
Be brief — 2-4 sentences max. Highlight what's important."""
prompt = f"""Analyze this calendar and provide a brief summary:
{context}
Key points: overall busyness, best times for deep work, anything notable."""
result = delegate("sonnet", system, prompt, max_tokens=512)
return {
"tier": "sonnet",
"operation": "summary",
"total_events": week_data['total_events'],
"summary": result.get("content", result.get("error", "Failed to analyze")),
"usage": result.get("usage", {})
}
def smart_default() -> str:
"""Return default command based on time of day."""
hour = datetime.now().hour
return "today" if hour < 18 else "tomorrow"
def main():
parser = argparse.ArgumentParser(description="Google Calendar operations with tiered delegation")
subparsers = parser.add_subparsers(dest="command")
# Subcommands
subparsers.add_parser("today", help="Today's agenda (Haiku tier)")
subparsers.add_parser("tomorrow", help="Tomorrow's agenda (Haiku tier)")
subparsers.add_parser("week", help="Next 7 days (Haiku tier)")
subparsers.add_parser("next", help="Next upcoming event (Haiku tier)")
subparsers.add_parser("summary", help="Smart week summary (Sonnet tier)")
args = parser.parse_args()
# Smart default if no command
command = args.command or smart_default()
try:
if command == "today":
result = cmd_today()
elif command == "tomorrow":
result = cmd_tomorrow()
elif command == "week":
result = cmd_week()
elif command == "next":
result = cmd_next()
elif command == "summary":
result = cmd_summary()
else:
result = {"error": f"Unknown command: {command}"}
print(json.dumps(result, indent=2, default=str))
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()