Files
claude-code/mcp/delegation/gmail_delegate.py
OpenCode Test d9332ae118 Add tiered model delegation for gmail operations
Implements cost-efficient gmail operations by delegating to appropriate
model tiers via Claude CLI subprocess. Simple fetches use no LLM,
summarization and triage delegate to Sonnet, complex reasoning stays
with Opus (PA). Uses subscription instead of API key.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-31 21:35:32 -08:00

287 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Gmail Delegation Helper
Fetches emails via Gmail API, then delegates summarization to appropriate model tier.
Usage:
gmail_delegate.py check-unread [--days N]
gmail_delegate.py summarize --query "QUERY"
gmail_delegate.py urgent
"""
import sys
import os
import json
import argparse
import base64
import re
from collections import defaultdict
from pathlib import Path
# Set credentials path
os.environ["GMAIL_CREDENTIALS_PATH"] = str(Path.home() / ".gmail-mcp" / "credentials.json")
# Note: Run this script with ~/.claude/mcp/gmail/venv/bin/python
from gmail_mcp.utils.GCP.gmail_auth import get_gmail_service
import subprocess
# Claude CLI path
CLAUDE_CLI = "/home/linuxbrew/.linuxbrew/bin/claude"
def delegate(model: str, system: str, prompt: str, max_tokens: int = 4096) -> dict:
"""Delegate a task to Claude CLI using subscription."""
try:
# Build command
cmd = [
CLAUDE_CLI,
"--print",
"--model", model,
"--system-prompt", system,
"--output-format", "json",
prompt
]
# Run claude CLI
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0:
return {"error": f"Claude CLI error: {result.stderr}"}
# Parse JSON output
try:
output = json.loads(result.stdout)
# Extract text from the response
content = output.get("result", "")
return {
"success": True,
"model": model,
"content": content,
"usage": output.get("usage", {})
}
except json.JSONDecodeError:
# If not JSON, use raw text output
return {
"success": True,
"model": model,
"content": result.stdout.strip(),
"usage": {}
}
except subprocess.TimeoutExpired:
return {"error": "Claude CLI timed out"}
except FileNotFoundError:
return {"error": f"Claude CLI not found at {CLAUDE_CLI}"}
except Exception as e:
return {"error": f"Unexpected error: {e}"}
def fetch_emails(query: str, max_results: int = 25) -> list[dict]:
"""Fetch emails matching query, return metadata + snippets."""
service = get_gmail_service()
results = service.users().messages().list(
userId='me', q=query, maxResults=max_results
).execute()
emails = []
for msg in results.get('messages', []):
detail = service.users().messages().get(
userId='me', id=msg['id'], format='metadata',
metadataHeaders=['From', 'Subject', 'Date']
).execute()
headers = {h['name']: h['value'] for h in detail['payload']['headers']}
emails.append({
'id': msg['id'],
'from': headers.get('From', 'Unknown'),
'subject': headers.get('Subject', '(no subject)'),
'date': headers.get('Date', 'Unknown'),
'snippet': detail.get('snippet', '')[:200]
})
return emails
def fetch_email_body(msg_id: str) -> str:
"""Fetch full email body for summarization."""
service = get_gmail_service()
detail = service.users().messages().get(
userId='me', id=msg_id, format='full'
).execute()
payload = detail['payload']
def find_text(part):
if part.get('mimeType') == 'text/plain':
data = part['body'].get('data', '')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
if 'parts' in part:
for p in part['parts']:
result = find_text(p)
if result:
return result
return None
def find_html(part):
if part.get('mimeType') == 'text/html':
data = part['body'].get('data', '')
if data:
html = base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
# Strip HTML tags
text = re.sub(r'<style[^>]*>.*?</style>', '', html, flags=re.DOTALL)
text = re.sub(r'<script[^>]*>.*?</script>', '', text, flags=re.DOTALL)
text = re.sub(r'<[^>]+>', ' ', text)
text = re.sub(r'&nbsp;', ' ', text)
text = re.sub(r'&amp;', '&', text)
text = re.sub(r'\s+', ' ', text)
return text.strip()
if 'parts' in part:
for p in part['parts']:
result = find_html(p)
if result:
return result
return None
text = find_text(payload) or find_html(payload) or detail.get('snippet', '')
return text[:3000] # Limit for context window
def check_unread(days: int = 7) -> dict:
"""Check unread emails - Haiku tier operation."""
query = f"is:unread newer_than:{days}d"
emails = fetch_emails(query)
# Group by sender (simple operation, no LLM needed)
by_sender = defaultdict(list)
for email in emails:
sender = email['from'].split('<')[0].strip().strip('"')
by_sender[sender].append(email['subject'][:50])
return {
"tier": "haiku",
"operation": "check_unread",
"total": len(emails),
"by_sender": dict(by_sender)
}
def summarize_emails(query: str, max_results: int = 10) -> dict:
"""Summarize emails matching query - Sonnet tier operation."""
emails = fetch_emails(query, max_results)
if not emails:
return {"tier": "sonnet", "operation": "summarize", "summary": "No emails found."}
# Build context for summarization
context = "Emails to summarize:\n\n"
for i, email in enumerate(emails, 1):
body = fetch_email_body(email['id'])
context += f"--- Email {i} ---\n"
context += f"From: {email['from']}\n"
context += f"Subject: {email['subject']}\n"
context += f"Date: {email['date']}\n"
context += f"Content: {body[:1000]}\n\n"
# Delegate to Sonnet for summarization
system = """You are an email summarization assistant. Provide concise, actionable summaries.
Focus on: key points, action items, important dates, and who needs response.
Format: Use bullet points, group by topic if multiple related emails."""
prompt = f"""Summarize these emails concisely:
{context}
Provide a brief summary highlighting what's important and any action items."""
result = delegate("sonnet", system, prompt, max_tokens=1024)
return {
"tier": "sonnet",
"operation": "summarize",
"email_count": len(emails),
"summary": result.get("content", result.get("error", "Failed to summarize")),
"usage": result.get("usage", {})
}
def check_urgent() -> dict:
"""Check for urgent emails - Haiku fetch + Sonnet analysis."""
query = 'is:unread newer_than:3d (subject:urgent OR subject:asap OR subject:"action required" OR is:important)'
emails = fetch_emails(query, max_results=15)
if not emails:
return {"tier": "haiku", "operation": "urgent", "message": "No urgent emails found."}
# For urgent, we want Sonnet to prioritize
context = "Potentially urgent emails:\n\n"
for email in emails:
context += f"- From: {email['from']}\n"
context += f" Subject: {email['subject']}\n"
context += f" Date: {email['date']}\n"
context += f" Preview: {email['snippet']}\n\n"
system = """You are an email triage assistant. Identify truly urgent items that need immediate attention.
Distinguish between: actually urgent (needs response today), important (needs response soon), and FYI (can wait)."""
prompt = f"""Triage these flagged emails by urgency:
{context}
List any that are truly urgent first, then important, then FYI."""
result = delegate("sonnet", system, prompt, max_tokens=1024)
return {
"tier": "sonnet",
"operation": "urgent",
"email_count": len(emails),
"triage": result.get("content", result.get("error", "Failed to triage")),
"usage": result.get("usage", {})
}
def main():
parser = argparse.ArgumentParser(description="Gmail operations with tiered delegation")
subparsers = parser.add_subparsers(dest="command", required=True)
# check-unread
unread = subparsers.add_parser("check-unread", help="List unread emails (Haiku tier)")
unread.add_argument("--days", "-d", type=int, default=7, help="Days to look back")
# summarize
summarize = subparsers.add_parser("summarize", help="Summarize emails (Sonnet tier)")
summarize.add_argument("--query", "-q", required=True, help="Gmail search query")
summarize.add_argument("--max", "-m", type=int, default=10, help="Max emails to summarize")
# urgent
subparsers.add_parser("urgent", help="Check urgent emails (Sonnet tier)")
args = parser.parse_args()
try:
if args.command == "check-unread":
result = check_unread(args.days)
elif args.command == "summarize":
result = summarize_emails(args.query, args.max)
elif args.command == "urgent":
result = check_urgent()
else:
result = {"error": f"Unknown command: {args.command}"}
print(json.dumps(result, indent=2))
except Exception as e:
print(json.dumps({"error": str(e)}))
sys.exit(1)
if __name__ == "__main__":
main()