import { google, type Auth } from 'googleapis'; import { readFileSync, writeFileSync, existsSync, mkdirSync, chmodSync } from 'fs'; import { dirname, resolve } from 'path'; import { homedir } from 'os'; import type { v1 } from '@google-cloud/pubsub'; import type { GmailConfig } from '../config/schema.js'; import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; import { parseInterval } from './heartbeat.js'; import { sanitizeHtml } from '../utils/html.js'; import { auditLogger } from '../audit/index.js'; /** Minimal interface for the parts of ChannelRegistry we need. */ interface ChannelLookup { get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; } /** Parsed email information used for template rendering. */ interface EmailInfo { id: string; from: string; to: string; subject: string; snippet: string; date: string; labels: string[]; } /** Pub/Sub push notification payload from Google. */ interface PubSubNotification { emailAddress: string; historyId: string; } const DEFAULT_TOPIC_ID = 'gmail-push'; // Watch expires after ~7 days; renew at 6 days (in ms). const WATCH_RENEWAL_MS = 6 * 24 * 60 * 60 * 1000; /** * GmailWatcher monitors a Gmail inbox for new messages and forwards them * as InboundMessages via the channel adapter pattern. * * Supports two modes: * - **Pub/Sub push**: Google sends push notifications when new emails arrive. * Requires a POST /gmail/push route on the gateway. * - **Polling fallback**: Periodically polls Gmail History API for changes. * * Authentication uses OAuth2 with a stored refresh token. */ export class GmailWatcher implements ChannelAdapter { readonly name = 'gmail'; private _status: ChannelStatus = 'disconnected'; private messageHandler?: (msg: InboundMessage) => void; private oauth2Client?: Auth.OAuth2Client; private lastHistoryId?: string; private pollTimer?: ReturnType; private watchTimer?: ReturnType; private pullTimer?: ReturnType; private pubsubSubscriber?: v1.SubscriberClient; private pullInFlight = false; private readonly config: NonNullable; private googleProjectId?: string; constructor( config: NonNullable, private readonly channelLookup: ChannelLookup, ) { this.config = config; } get status(): ChannelStatus { return this._status; } async connect(): Promise { this._status = 'connecting'; try { this.oauth2Client = await this.authorize(); } catch (error) { const errMsg = error instanceof Error ? error.message : 'Unknown error'; console.error(`GmailWatcher: Authorization failed — ${errMsg}`); console.error('GmailWatcher: Run "flynn gmail-auth" to set up Gmail credentials.'); this._status = 'error'; return; } // Set up Gmail push watch (Pub/Sub). Polling is always enabled. if (!this.config.disable_push) { try { await this.setupWatch(); } catch (error) { const errMsg = error instanceof Error ? error.message : 'Unknown error'; const hint = this.buildWatchErrorHint(errMsg); console.warn(`GmailWatcher: Watch setup failed (will use polling only) — ${errMsg}${hint}`); } } else { const configured = (this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC ?? '').trim(); if (configured) { console.log('GmailWatcher: Push disabled (disable_push=true)'); } } // Set up Pub/Sub pull subscription (optional). try { await this.setupPullSubscription(); } catch (error) { const errMsg = error instanceof Error ? error.message : 'Unknown error'; console.warn(`GmailWatcher: Pull setup failed (will continue without pull) — ${errMsg}`); } // Start polling fallback const pollMs = parseInterval(this.config.poll_interval ?? '300s'); this.pollTimer = setInterval(() => { this.pollForNewMessages().catch((err) => { console.error('GmailWatcher: Poll error —', err instanceof Error ? err.message : err); }); }, pollMs); this._status = 'connected'; const modes: string[] = []; const pushConfigured = Boolean((this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC ?? '').trim()); const pullConfigured = Boolean((this.config.pubsub_subscription_id ?? '').trim()); if (pushConfigured && !this.config.disable_push) {modes.push('push');} if (pullConfigured) {modes.push('pull');} modes.push('poll'); console.log( `GmailWatcher: Connected (${modes.join('+')}, poll_interval=${this.config.poll_interval ?? '300s'}${pullConfigured ? `, pubsub_pull_interval=${this.config.pubsub_pull_interval ?? '60s'}` : ''})`, ); auditLogger?.systemStart('GmailWatcher', { modes: modes.join('+'), poll_interval: this.config.poll_interval, pubsub_topic: pushConfigured ? 'configured' : 'none', pubsub_subscription_id: pullConfigured ? 'configured' : 'none', }); } async disconnect(): Promise { if (this.pollTimer) { clearInterval(this.pollTimer); this.pollTimer = undefined; } if (this.watchTimer) { clearInterval(this.watchTimer); this.watchTimer = undefined; } if (this.pullTimer) { clearInterval(this.pullTimer); this.pullTimer = undefined; } if (this.pubsubSubscriber) { try { await this.pubsubSubscriber.close(); } catch { // Ignore shutdown errors } this.pubsubSubscriber = undefined; } this.oauth2Client = undefined; this._status = 'disconnected'; auditLogger?.systemStop('GmailWatcher'); } async send(peerId: string, message: OutboundMessage): Promise { // Route responses to the configured output channel const outputAdapter = this.channelLookup.get(this.config.output.channel); if (!outputAdapter) { console.warn(`GmailWatcher: Output channel '${this.config.output.channel}' not found`); return; } await outputAdapter.send(this.config.output.peer, message); } onMessage(handler: (msg: InboundMessage) => void): void { this.messageHandler = handler; } /** * Handle a Pub/Sub push notification from Google. * Called by the gateway when POST /gmail/push is received. * @param data Base64-encoded Pub/Sub message data */ async handlePushNotification(data: string): Promise { try { const decoded = Buffer.from(data, 'base64').toString('utf-8'); const notification = JSON.parse(decoded) as PubSubNotification; if (!notification.historyId) { console.warn('GmailWatcher: Push notification missing historyId'); return; } // Only process if the new historyId is greater than our last known one if (this.lastHistoryId && BigInt(notification.historyId) <= BigInt(this.lastHistoryId)) { return; } if (this.lastHistoryId) { await this.processHistoryChanges(this.lastHistoryId); } this.lastHistoryId = notification.historyId; } catch (error) { console.error('GmailWatcher: Push notification error —', error instanceof Error ? error.message : error); } } /** * Authorize with Gmail using stored OAuth2 credentials. * Reads client credentials and stored token from config paths. */ private async authorize(): Promise { const credentialsPath = this.config.credentials_file; if (!credentialsPath) { throw new Error('No credentials_file configured. Set automation.gmail.credentials_file in config.'); } const expandedCredsPath = this.expandPath(credentialsPath); if (!existsSync(expandedCredsPath)) { throw new Error(`Credentials file not found: ${expandedCredsPath}`); } const credentials = JSON.parse(readFileSync(expandedCredsPath, 'utf-8')); const { client_id, client_secret, redirect_uris, project_id } = credentials.installed ?? credentials.web ?? {}; if (project_id && typeof project_id === 'string') { this.googleProjectId = project_id; } if (!client_id || !client_secret) { throw new Error('Invalid credentials file — missing client_id or client_secret'); } const oauth2Client = new google.auth.OAuth2( client_id, client_secret, redirect_uris?.[0] ?? 'http://localhost', ); // Load stored token const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json'); if (!existsSync(tokenPath)) { throw new Error( `Token file not found: ${tokenPath}. Run "flynn gmail-auth" to authenticate.`, ); } const token = JSON.parse(readFileSync(tokenPath, 'utf-8')); oauth2Client.setCredentials(token); // Auto-save refreshed tokens oauth2Client.on('tokens', (newTokens) => { const merged = { ...token, ...newTokens }; this.saveToken(merged); }); return oauth2Client; } /** * Set up Gmail Pub/Sub watch for push notifications. * Calls gmail.users.watch() and schedules renewal before expiry. */ private async setupWatch(): Promise { if (!this.oauth2Client) {return;} if (this.watchTimer) { clearInterval(this.watchTimer); this.watchTimer = undefined; } const topicName = this.resolvePubSubTopicName(); if (!topicName) { // Push notifications are optional; polling is always enabled. return; } const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); const watchResponse = await gmail.users.watch({ userId: 'me', requestBody: { labelIds: this.config.watch_labels ?? ['INBOX'], topicName, }, }); if (watchResponse.data.historyId) { this.lastHistoryId = watchResponse.data.historyId.toString(); } console.log(`GmailWatcher: Watch registered (historyId=${this.lastHistoryId})`); // Schedule renewal before watch expiry (~7 days) this.watchTimer = setInterval(() => { this.setupWatch().catch((err) => { console.error('GmailWatcher: Watch renewal failed —', err instanceof Error ? err.message : err); }); }, WATCH_RENEWAL_MS); } private buildWatchErrorHint(errMsg: string): string { const hints: string[] = []; if (errMsg.includes('Invalid topicName')) { hints.push( `Tip: set automation.gmail.pubsub_topic to "projects/${this.googleProjectId ?? ''}/topics/${DEFAULT_TOPIC_ID}"`, ); } if (/permission denied|PERMISSION_DENIED/i.test(errMsg)) { hints.push('Tip: ensure Gmail has permission to publish to the Pub/Sub topic (IAM)'); } hints.push('Tip: if Google cannot reach your gateway, set automation.gmail.pubsub_subscription_id for pull mode'); return hints.length > 0 ? `\n ${hints.join('\n ')}` : ''; } /** * Resolve the Pub/Sub topic resource name for Gmail push notifications. * * Priority: * 1) automation.gmail.pubsub_topic * 2) FLYNN_GMAIL_PUBSUB_TOPIC env var * If neither is provided, push notifications are disabled. */ private resolvePubSubTopicName(): string | null { const configured = this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC; let topic = (configured ?? '').trim(); if (!topic) {return null;} // Allow shorthand: just the topic id (e.g. "gmail-push") if (!topic.includes('/')) { if (!this.googleProjectId) { throw new Error( `pubsub_topic '${topic}' must be fully qualified (projects//topics/) because project_id was not found in credentials`, ); } topic = `projects/${this.googleProjectId}/topics/${topic}`; } const isValid = /^projects\/[^/]+\/topics\/[^/]+$/.test(topic); if (!isValid) { throw new Error( `Invalid pubsub_topic '${topic}'. Expected: projects//topics/`, ); } return topic; } private resolvePubSubSubscriptionName(): string | null { let sub = (this.config.pubsub_subscription_id ?? '').trim(); if (!sub) {return null;} // Allow shorthand: just the subscription id (e.g. "gmail-pull") if (!sub.includes('/')) { if (!this.googleProjectId) { throw new Error( `pubsub_subscription_id '${sub}' must be fully qualified (projects//subscriptions/) because project_id was not found in credentials`, ); } sub = `projects/${this.googleProjectId}/subscriptions/${sub}`; } const isValid = /^projects\/[^/]+\/subscriptions\/[^/]+$/.test(sub); if (!isValid) { throw new Error( `Invalid pubsub_subscription_id '${sub}'. Expected: projects//subscriptions/`, ); } return sub; } private async setupPullSubscription(): Promise { const subscriptionName = this.resolvePubSubSubscriptionName(); if (!subscriptionName) {return;} if (this.pullTimer) { clearInterval(this.pullTimer); this.pullTimer = undefined; } const pullMs = parseInterval(this.config.pubsub_pull_interval ?? '60s'); // Kick once immediately, then on interval. await this.pullSubscriptionMessages().catch((err) => { console.error('GmailWatcher: Pub/Sub pull error —', err instanceof Error ? err.message : err); }); this.pullTimer = setInterval(() => { this.pullSubscriptionMessages().catch((err) => { console.error('GmailWatcher: Pub/Sub pull error —', err instanceof Error ? err.message : err); }); }, pullMs); console.log( `GmailWatcher: Pull enabled (subscription=${subscriptionName}, interval=${this.config.pubsub_pull_interval ?? '60s'})`, ); } private async getSubscriberClient(): Promise { if (this.pubsubSubscriber) {return this.pubsubSubscriber;} const mod = await import('@google-cloud/pubsub'); this.pubsubSubscriber = new mod.v1.SubscriberClient(); return this.pubsubSubscriber; } private async pullSubscriptionMessages(): Promise { const subscription = this.resolvePubSubSubscriptionName(); if (!subscription) {return;} if (this.pullInFlight) {return;} this.pullInFlight = true; try { const client = await this.getSubscriberClient(); const maxMessages = this.config.pubsub_max_messages ?? 10; const [response] = await client.pull({ subscription, maxMessages, }); const received = response.receivedMessages ?? []; if (received.length === 0) {return;} const ackIds: string[] = []; for (const receivedMessage of received) { const ackId = receivedMessage.ackId; const data = receivedMessage.message?.data; if (!ackId || !data) {continue;} const base64 = Buffer.from(data as Uint8Array).toString('base64'); try { await this.handlePushNotification(base64); ackIds.push(ackId); } catch { // If processing fails, leave message unacked for retry. } } if (ackIds.length > 0) { await client.acknowledge({ subscription, ackIds }); } } finally { this.pullInFlight = false; } } /** * Poll Gmail History API for new messages since lastHistoryId. * Fallback mechanism when Pub/Sub push is not available. */ private async pollForNewMessages(): Promise { if (!this.oauth2Client) {return;} const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); // If no historyId yet, initialize it from the profile if (!this.lastHistoryId) { try { const profile = await gmail.users.getProfile({ userId: 'me' }); if (profile.data.historyId) { this.lastHistoryId = profile.data.historyId.toString(); } } catch (error) { console.error('GmailWatcher: Failed to get profile —', error instanceof Error ? error.message : error); } return; // First poll — just establish the baseline } await this.processHistoryChanges(this.lastHistoryId); } /** * Fetch history changes since the given historyId and process new messages. * Updates lastHistoryId to the latest value from the response. */ private async processHistoryChanges(startHistoryId: string): Promise { if (!this.oauth2Client) {return;} const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); try { const historyResponse = await gmail.users.history.list({ userId: 'me', startHistoryId: startHistoryId, labelId: (this.config.watch_labels ?? ['INBOX'])[0], historyTypes: ['messageAdded'], }); const history = historyResponse.data.history ?? []; const processedIds = new Set(); for (const record of history) { const addedMessages = record.messagesAdded ?? []; for (const added of addedMessages) { const messageId = added.message?.id; if (!messageId || processedIds.has(messageId)) {continue;} processedIds.add(messageId); const email = await this.getMessageDetails(messageId); if (!email) {continue;} // Skip messages before history_start if configured if (this.config.history_start) { const emailDate = new Date(email.date); const startDate = new Date(this.config.history_start); if (emailDate < startDate) {continue;} } const text = this.renderTemplate(email); const msg: InboundMessage = { id: `gmail-${email.id}-${Date.now()}`, channel: 'gmail', senderId: email.from, senderName: `gmail:${email.from}`, text, timestamp: Date.now(), metadata: { emailId: email.id, from: email.from, to: email.to, subject: email.subject, labels: email.labels, }, }; this.messageHandler?.(msg); } } // Update historyId to the latest if (historyResponse.data.historyId) { this.lastHistoryId = historyResponse.data.historyId.toString(); } } catch (error: unknown) { // 404 means historyId is too old — reset by fetching profile if (error instanceof Error && 'code' in error && (error as { code: number }).code === 404) { console.warn('GmailWatcher: History expired, re-syncing...'); try { const profile = await gmail.users.getProfile({ userId: 'me' }); if (profile.data.historyId) { this.lastHistoryId = profile.data.historyId.toString(); } } catch (profileError) { console.error('GmailWatcher: Failed to re-sync profile —', profileError instanceof Error ? profileError.message : profileError); } } else { throw error; } } } /** * Fetch full message details by ID and extract relevant headers. */ private async getMessageDetails(messageId: string): Promise { if (!this.oauth2Client) {return null;} const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); try { const msg = await gmail.users.messages.get({ userId: 'me', id: messageId, format: 'metadata', metadataHeaders: ['From', 'To', 'Subject', 'Date'], }); const headers = msg.data.payload?.headers ?? []; const getHeader = (name: string): string => headers.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value ?? ''; return { id: messageId, from: getHeader('From'), to: getHeader('To'), subject: getHeader('Subject'), snippet: sanitizeHtml(msg.data.snippet ?? ''), date: getHeader('Date'), labels: msg.data.labelIds ?? [], }; } catch (error) { console.error(`GmailWatcher: Failed to fetch message ${messageId} —`, error instanceof Error ? error.message : error); return null; } } /** * Render the message template with email placeholders. * Supported: {{from}}, {{to}}, {{subject}}, {{snippet}}, {{date}}, {{id}}, {{labels}} */ renderTemplate(email: EmailInfo): string { return this.config.message .replace(/\{\{from\}\}/g, email.from) .replace(/\{\{to\}\}/g, email.to) .replace(/\{\{subject\}\}/g, email.subject) .replace(/\{\{snippet\}\}/g, email.snippet) .replace(/\{\{date\}\}/g, email.date) .replace(/\{\{id\}\}/g, email.id) .replace(/\{\{labels\}\}/g, email.labels.join(', ')); } /** * Expand ~ to the user's home directory. */ expandPath(p: string): string { if (p.startsWith('~/') || p === '~') { return resolve(homedir(), p.slice(2)); } return resolve(p); } /** * Save token to disk with restrictive permissions (0o600). */ private saveToken(token: unknown): void { const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json'); const dir = dirname(tokenPath); if (!existsSync(dir)) { mkdirSync(dir, { recursive: true }); } writeFileSync(tokenPath, JSON.stringify(token, null, 2), 'utf-8'); try { chmodSync(tokenPath, 0o600); } catch { // chmod may fail on some filesystems — not critical } } }