Files
flynn/src/automation/gmail.ts
T

638 lines
21 KiB
TypeScript

import { google, type Auth } from 'googleapis';
import { readFileSync, writeFileSync, existsSync, mkdirSync, chmodSync } from 'fs';
import { dirname, resolve } from 'path';
import { homedir } from 'os';
import type { v1 } from '@google-cloud/pubsub';
import type { GmailConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import { parseInterval } from './heartbeat.js';
import { sanitizeHtml } from '../utils/html.js';
import { auditLogger } from '../audit/index.js';
/** Minimal interface for the parts of ChannelRegistry we need. */
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
/** Parsed email information used for template rendering. */
interface EmailInfo {
id: string;
from: string;
to: string;
subject: string;
snippet: string;
date: string;
labels: string[];
}
/** Pub/Sub push notification payload from Google. */
interface PubSubNotification {
emailAddress: string;
historyId: string;
}
const DEFAULT_TOPIC_ID = 'gmail-push';
// Watch expires after ~7 days; renew at 6 days (in ms).
const WATCH_RENEWAL_MS = 6 * 24 * 60 * 60 * 1000;
/**
* GmailWatcher monitors a Gmail inbox for new messages and forwards them
* as InboundMessages via the channel adapter pattern.
*
* Supports two modes:
* - **Pub/Sub push**: Google sends push notifications when new emails arrive.
* Requires a POST /gmail/push route on the gateway.
* - **Polling fallback**: Periodically polls Gmail History API for changes.
*
* Authentication uses OAuth2 with a stored refresh token.
*/
export class GmailWatcher implements ChannelAdapter {
readonly name = 'gmail';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private oauth2Client?: Auth.OAuth2Client;
private lastHistoryId?: string;
private pollTimer?: ReturnType<typeof setInterval>;
private watchTimer?: ReturnType<typeof setInterval>;
private pullTimer?: ReturnType<typeof setInterval>;
private pubsubSubscriber?: v1.SubscriberClient;
private pullInFlight = false;
private readonly config: NonNullable<GmailConfig>;
private googleProjectId?: string;
constructor(
config: NonNullable<GmailConfig>,
private readonly channelLookup: ChannelLookup,
) {
this.config = config;
}
get status(): ChannelStatus {
return this._status;
}
async connect(): Promise<void> {
this._status = 'connecting';
try {
this.oauth2Client = await this.authorize();
} catch (error) {
const errMsg = error instanceof Error ? error.message : 'Unknown error';
console.error(`GmailWatcher: Authorization failed — ${errMsg}`);
console.error('GmailWatcher: Run "flynn gmail-auth" to set up Gmail credentials.');
this._status = 'error';
return;
}
// Set up Gmail push watch (Pub/Sub). Polling is always enabled.
if (!this.config.disable_push) {
try {
await this.setupWatch();
} catch (error) {
const errMsg = error instanceof Error ? error.message : 'Unknown error';
const hint = this.buildWatchErrorHint(errMsg);
console.warn(`GmailWatcher: Watch setup failed (will use polling only) — ${errMsg}${hint}`);
}
} else {
const configured = (this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC ?? '').trim();
if (configured) {
console.log('GmailWatcher: Push disabled (disable_push=true)');
}
}
// Set up Pub/Sub pull subscription (optional).
try {
await this.setupPullSubscription();
} catch (error) {
const errMsg = error instanceof Error ? error.message : 'Unknown error';
console.warn(`GmailWatcher: Pull setup failed (will continue without pull) — ${errMsg}`);
}
// Start polling fallback
const pollMs = parseInterval(this.config.poll_interval ?? '300s');
this.pollTimer = setInterval(() => {
this.pollForNewMessages().catch((err) => {
console.error('GmailWatcher: Poll error —', err instanceof Error ? err.message : err);
});
}, pollMs);
this._status = 'connected';
const modes: string[] = [];
const pushConfigured = Boolean((this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC ?? '').trim());
const pullConfigured = Boolean((this.config.pubsub_subscription_id ?? '').trim());
if (pushConfigured && !this.config.disable_push) {modes.push('push');}
if (pullConfigured) {modes.push('pull');}
modes.push('poll');
console.log(
`GmailWatcher: Connected (${modes.join('+')}, poll_interval=${this.config.poll_interval ?? '300s'}${pullConfigured ? `, pubsub_pull_interval=${this.config.pubsub_pull_interval ?? '60s'}` : ''})`,
);
auditLogger?.systemStart('GmailWatcher', {
modes: modes.join('+'),
poll_interval: this.config.poll_interval,
pubsub_topic: pushConfigured ? 'configured' : 'none',
pubsub_subscription_id: pullConfigured ? 'configured' : 'none',
});
}
async disconnect(): Promise<void> {
if (this.pollTimer) {
clearInterval(this.pollTimer);
this.pollTimer = undefined;
}
if (this.watchTimer) {
clearInterval(this.watchTimer);
this.watchTimer = undefined;
}
if (this.pullTimer) {
clearInterval(this.pullTimer);
this.pullTimer = undefined;
}
if (this.pubsubSubscriber) {
try {
await this.pubsubSubscriber.close();
} catch {
// Ignore shutdown errors
}
this.pubsubSubscriber = undefined;
}
this.oauth2Client = undefined;
this._status = 'disconnected';
auditLogger?.systemStop('GmailWatcher');
}
async send(peerId: string, message: OutboundMessage): Promise<void> {
// Route responses to the configured output channel
const outputAdapter = this.channelLookup.get(this.config.output.channel);
if (!outputAdapter) {
console.warn(`GmailWatcher: Output channel '${this.config.output.channel}' not found`);
return;
}
await outputAdapter.send(this.config.output.peer, message);
}
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/**
* Handle a Pub/Sub push notification from Google.
* Called by the gateway when POST /gmail/push is received.
* @param data Base64-encoded Pub/Sub message data
*/
async handlePushNotification(data: string): Promise<void> {
try {
const decoded = Buffer.from(data, 'base64').toString('utf-8');
const notification = JSON.parse(decoded) as PubSubNotification;
if (!notification.historyId) {
console.warn('GmailWatcher: Push notification missing historyId');
return;
}
// Only process if the new historyId is greater than our last known one
if (this.lastHistoryId && BigInt(notification.historyId) <= BigInt(this.lastHistoryId)) {
return;
}
if (this.lastHistoryId) {
await this.processHistoryChanges(this.lastHistoryId);
}
this.lastHistoryId = notification.historyId;
} catch (error) {
console.error('GmailWatcher: Push notification error —', error instanceof Error ? error.message : error);
}
}
/**
* Authorize with Gmail using stored OAuth2 credentials.
* Reads client credentials and stored token from config paths.
*/
private async authorize(): Promise<Auth.OAuth2Client> {
const credentialsPath = this.config.credentials_file;
if (!credentialsPath) {
throw new Error('No credentials_file configured. Set automation.gmail.credentials_file in config.');
}
const expandedCredsPath = this.expandPath(credentialsPath);
if (!existsSync(expandedCredsPath)) {
throw new Error(`Credentials file not found: ${expandedCredsPath}`);
}
const credentials = JSON.parse(readFileSync(expandedCredsPath, 'utf-8'));
const { client_id, client_secret, redirect_uris, project_id } = credentials.installed ?? credentials.web ?? {};
if (project_id && typeof project_id === 'string') {
this.googleProjectId = project_id;
}
if (!client_id || !client_secret) {
throw new Error('Invalid credentials file — missing client_id or client_secret');
}
const oauth2Client = new google.auth.OAuth2(
client_id,
client_secret,
redirect_uris?.[0] ?? 'http://localhost',
);
// Load stored token
const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json');
if (!existsSync(tokenPath)) {
throw new Error(
`Token file not found: ${tokenPath}. Run "flynn gmail-auth" to authenticate.`,
);
}
const token = JSON.parse(readFileSync(tokenPath, 'utf-8'));
oauth2Client.setCredentials(token);
// Auto-save refreshed tokens
oauth2Client.on('tokens', (newTokens) => {
const merged = { ...token, ...newTokens };
this.saveToken(merged);
});
return oauth2Client;
}
/**
* Set up Gmail Pub/Sub watch for push notifications.
* Calls gmail.users.watch() and schedules renewal before expiry.
*/
private async setupWatch(): Promise<void> {
if (!this.oauth2Client) {return;}
if (this.watchTimer) {
clearInterval(this.watchTimer);
this.watchTimer = undefined;
}
const topicName = this.resolvePubSubTopicName();
if (!topicName) {
// Push notifications are optional; polling is always enabled.
return;
}
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
const watchResponse = await gmail.users.watch({
userId: 'me',
requestBody: {
labelIds: this.config.watch_labels ?? ['INBOX'],
topicName,
},
});
if (watchResponse.data.historyId) {
this.lastHistoryId = watchResponse.data.historyId.toString();
}
console.log(`GmailWatcher: Watch registered (historyId=${this.lastHistoryId})`);
// Schedule renewal before watch expiry (~7 days)
this.watchTimer = setInterval(() => {
this.setupWatch().catch((err) => {
console.error('GmailWatcher: Watch renewal failed —', err instanceof Error ? err.message : err);
});
}, WATCH_RENEWAL_MS);
}
private buildWatchErrorHint(errMsg: string): string {
const hints: string[] = [];
if (errMsg.includes('Invalid topicName')) {
hints.push(
`Tip: set automation.gmail.pubsub_topic to "projects/${this.googleProjectId ?? '<project-id>'}/topics/${DEFAULT_TOPIC_ID}"`,
);
}
if (/permission denied|PERMISSION_DENIED/i.test(errMsg)) {
hints.push('Tip: ensure Gmail has permission to publish to the Pub/Sub topic (IAM)');
}
hints.push('Tip: if Google cannot reach your gateway, set automation.gmail.pubsub_subscription_id for pull mode');
return hints.length > 0 ? `\n ${hints.join('\n ')}` : '';
}
/**
* Resolve the Pub/Sub topic resource name for Gmail push notifications.
*
* Priority:
* 1) automation.gmail.pubsub_topic
* 2) FLYNN_GMAIL_PUBSUB_TOPIC env var
* If neither is provided, push notifications are disabled.
*/
private resolvePubSubTopicName(): string | null {
const configured = this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC;
let topic = (configured ?? '').trim();
if (!topic) {return null;}
// Allow shorthand: just the topic id (e.g. "gmail-push")
if (!topic.includes('/')) {
if (!this.googleProjectId) {
throw new Error(
`pubsub_topic '${topic}' must be fully qualified (projects/<project-id>/topics/<topic>) because project_id was not found in credentials`,
);
}
topic = `projects/${this.googleProjectId}/topics/${topic}`;
}
const isValid = /^projects\/[^/]+\/topics\/[^/]+$/.test(topic);
if (!isValid) {
throw new Error(
`Invalid pubsub_topic '${topic}'. Expected: projects/<project-id>/topics/<topic>`,
);
}
return topic;
}
private resolvePubSubSubscriptionName(): string | null {
let sub = (this.config.pubsub_subscription_id ?? '').trim();
if (!sub) {return null;}
// Allow shorthand: just the subscription id (e.g. "gmail-pull")
if (!sub.includes('/')) {
if (!this.googleProjectId) {
throw new Error(
`pubsub_subscription_id '${sub}' must be fully qualified (projects/<project-id>/subscriptions/<subscription>) because project_id was not found in credentials`,
);
}
sub = `projects/${this.googleProjectId}/subscriptions/${sub}`;
}
const isValid = /^projects\/[^/]+\/subscriptions\/[^/]+$/.test(sub);
if (!isValid) {
throw new Error(
`Invalid pubsub_subscription_id '${sub}'. Expected: projects/<project-id>/subscriptions/<subscription>`,
);
}
return sub;
}
private async setupPullSubscription(): Promise<void> {
const subscriptionName = this.resolvePubSubSubscriptionName();
if (!subscriptionName) {return;}
if (this.pullTimer) {
clearInterval(this.pullTimer);
this.pullTimer = undefined;
}
const pullMs = parseInterval(this.config.pubsub_pull_interval ?? '60s');
// Kick once immediately, then on interval.
await this.pullSubscriptionMessages().catch((err) => {
console.error('GmailWatcher: Pub/Sub pull error —', err instanceof Error ? err.message : err);
});
this.pullTimer = setInterval(() => {
this.pullSubscriptionMessages().catch((err) => {
console.error('GmailWatcher: Pub/Sub pull error —', err instanceof Error ? err.message : err);
});
}, pullMs);
console.log(
`GmailWatcher: Pull enabled (subscription=${subscriptionName}, interval=${this.config.pubsub_pull_interval ?? '60s'})`,
);
}
private async getSubscriberClient(): Promise<v1.SubscriberClient> {
if (this.pubsubSubscriber) {return this.pubsubSubscriber;}
const mod = await import('@google-cloud/pubsub');
this.pubsubSubscriber = new mod.v1.SubscriberClient();
return this.pubsubSubscriber;
}
private async pullSubscriptionMessages(): Promise<void> {
const subscription = this.resolvePubSubSubscriptionName();
if (!subscription) {return;}
if (this.pullInFlight) {return;}
this.pullInFlight = true;
try {
const client = await this.getSubscriberClient();
const maxMessages = this.config.pubsub_max_messages ?? 10;
const [response] = await client.pull({
subscription,
maxMessages,
});
const received = response.receivedMessages ?? [];
if (received.length === 0) {return;}
const ackIds: string[] = [];
for (const receivedMessage of received) {
const ackId = receivedMessage.ackId;
const data = receivedMessage.message?.data;
if (!ackId || !data) {continue;}
const base64 = Buffer.from(data as Uint8Array).toString('base64');
try {
await this.handlePushNotification(base64);
ackIds.push(ackId);
} catch {
// If processing fails, leave message unacked for retry.
}
}
if (ackIds.length > 0) {
await client.acknowledge({ subscription, ackIds });
}
} finally {
this.pullInFlight = false;
}
}
/**
* Poll Gmail History API for new messages since lastHistoryId.
* Fallback mechanism when Pub/Sub push is not available.
*/
private async pollForNewMessages(): Promise<void> {
if (!this.oauth2Client) {return;}
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
// If no historyId yet, initialize it from the profile
if (!this.lastHistoryId) {
try {
const profile = await gmail.users.getProfile({ userId: 'me' });
if (profile.data.historyId) {
this.lastHistoryId = profile.data.historyId.toString();
}
} catch (error) {
console.error('GmailWatcher: Failed to get profile —', error instanceof Error ? error.message : error);
}
return; // First poll — just establish the baseline
}
await this.processHistoryChanges(this.lastHistoryId);
}
/**
* Fetch history changes since the given historyId and process new messages.
* Updates lastHistoryId to the latest value from the response.
*/
private async processHistoryChanges(startHistoryId: string): Promise<void> {
if (!this.oauth2Client) {return;}
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
try {
const historyResponse = await gmail.users.history.list({
userId: 'me',
startHistoryId: startHistoryId,
labelId: (this.config.watch_labels ?? ['INBOX'])[0],
historyTypes: ['messageAdded'],
});
const history = historyResponse.data.history ?? [];
const processedIds = new Set<string>();
for (const record of history) {
const addedMessages = record.messagesAdded ?? [];
for (const added of addedMessages) {
const messageId = added.message?.id;
if (!messageId || processedIds.has(messageId)) {continue;}
processedIds.add(messageId);
const email = await this.getMessageDetails(messageId);
if (!email) {continue;}
// Skip messages before history_start if configured
if (this.config.history_start) {
const emailDate = new Date(email.date);
const startDate = new Date(this.config.history_start);
if (emailDate < startDate) {continue;}
}
const text = this.renderTemplate(email);
const msg: InboundMessage = {
id: `gmail-${email.id}-${Date.now()}`,
channel: 'gmail',
senderId: email.from,
senderName: `gmail:${email.from}`,
text,
timestamp: Date.now(),
metadata: {
emailId: email.id,
from: email.from,
to: email.to,
subject: email.subject,
labels: email.labels,
},
};
this.messageHandler?.(msg);
}
}
// Update historyId to the latest
if (historyResponse.data.historyId) {
this.lastHistoryId = historyResponse.data.historyId.toString();
}
} catch (error: unknown) {
// 404 means historyId is too old — reset by fetching profile
if (error instanceof Error && 'code' in error && (error as { code: number }).code === 404) {
console.warn('GmailWatcher: History expired, re-syncing...');
try {
const profile = await gmail.users.getProfile({ userId: 'me' });
if (profile.data.historyId) {
this.lastHistoryId = profile.data.historyId.toString();
}
} catch (profileError) {
console.error('GmailWatcher: Failed to re-sync profile —', profileError instanceof Error ? profileError.message : profileError);
}
} else {
throw error;
}
}
}
/**
* Fetch full message details by ID and extract relevant headers.
*/
private async getMessageDetails(messageId: string): Promise<EmailInfo | null> {
if (!this.oauth2Client) {return null;}
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
try {
const msg = await gmail.users.messages.get({
userId: 'me',
id: messageId,
format: 'metadata',
metadataHeaders: ['From', 'To', 'Subject', 'Date'],
});
const headers = msg.data.payload?.headers ?? [];
const getHeader = (name: string): string =>
headers.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value ?? '';
return {
id: messageId,
from: getHeader('From'),
to: getHeader('To'),
subject: getHeader('Subject'),
snippet: sanitizeHtml(msg.data.snippet ?? ''),
date: getHeader('Date'),
labels: msg.data.labelIds ?? [],
};
} catch (error) {
console.error(`GmailWatcher: Failed to fetch message ${messageId}`, error instanceof Error ? error.message : error);
return null;
}
}
/**
* Render the message template with email placeholders.
* Supported: {{from}}, {{to}}, {{subject}}, {{snippet}}, {{date}}, {{id}}, {{labels}}
*/
renderTemplate(email: EmailInfo): string {
return this.config.message
.replace(/\{\{from\}\}/g, email.from)
.replace(/\{\{to\}\}/g, email.to)
.replace(/\{\{subject\}\}/g, email.subject)
.replace(/\{\{snippet\}\}/g, email.snippet)
.replace(/\{\{date\}\}/g, email.date)
.replace(/\{\{id\}\}/g, email.id)
.replace(/\{\{labels\}\}/g, email.labels.join(', '));
}
/**
* Expand ~ to the user's home directory.
*/
expandPath(p: string): string {
if (p.startsWith('~/') || p === '~') {
return resolve(homedir(), p.slice(2));
}
return resolve(p);
}
/**
* Save token to disk with restrictive permissions (0o600).
*/
private saveToken(token: unknown): void {
const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json');
const dir = dirname(tokenPath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
writeFileSync(tokenPath, JSON.stringify(token, null, 2), 'utf-8');
try {
chmodSync(tokenPath, 0o600);
} catch {
// chmod may fail on some filesystems — not critical
}
}
}