feat: add OpenAI OAuth, strict model overrides, and Gmail pull mode

This commit is contained in:
William Valentin
2026-02-13 14:55:40 -08:00
parent 8f644d5e25
commit 955b9e28e0
50 changed files with 5955 additions and 160 deletions
+154 -3
View File
@@ -1,6 +1,6 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { homedir } from 'os';
import { GmailWatcher } from './gmail.js';
import type { GmailWatcher as GmailWatcherType } from './gmail.js';
import type { OutboundMessage } from '../channels/types.js';
// Mock googleapis module
@@ -74,6 +74,23 @@ vi.mock('googleapis', () => {
};
});
vi.mock('@google-cloud/pubsub', () => {
const pull = vi.fn().mockResolvedValue([{ receivedMessages: [] }]);
const acknowledge = vi.fn().mockResolvedValue([{}]);
const close = vi.fn().mockResolvedValue(undefined);
class SubscriberClient {
pull = pull;
acknowledge = acknowledge;
close = close;
}
return {
v1: { SubscriberClient },
_mocks: { pull, acknowledge, close },
};
});
// Mock fs operations
vi.mock('fs', async () => {
const actual = await vi.importActual<typeof import('fs')>('fs');
@@ -86,6 +103,7 @@ vi.mock('fs', async () => {
installed: {
client_id: 'test-client-id',
client_secret: 'test-client-secret',
project_id: 'test-project',
redirect_uris: ['http://localhost'],
},
});
@@ -108,6 +126,9 @@ function createMockConfig(overrides = {}) {
enabled: true,
credentials_file: '~/.config/flynn/gmail-credentials.json',
token_file: '~/.config/flynn/gmail-token.json',
disable_push: false,
pubsub_pull_interval: '60s',
pubsub_max_messages: 10,
watch_labels: ['INBOX'],
poll_interval: '300s',
output: {
@@ -128,12 +149,16 @@ function createMockChannelLookup() {
}
describe('GmailWatcher', () => {
let watcher: GmailWatcher;
let GmailWatcher: typeof GmailWatcherType;
let watcher: GmailWatcherType;
let channelLookup: ReturnType<typeof createMockChannelLookup>;
beforeEach(() => {
beforeEach(async () => {
vi.useFakeTimers();
channelLookup = createMockChannelLookup();
// Import after mocks so ESM named imports (fs/googleapis) are properly mocked.
({ GmailWatcher } = await import('./gmail.js'));
});
afterEach(async () => {
@@ -154,6 +179,60 @@ describe('GmailWatcher', () => {
});
});
describe('push topic resolution', () => {
it('returns null when pubsub_topic is not set', () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const topic = (watcher as unknown as { resolvePubSubTopicName: () => string | null }).resolvePubSubTopicName();
expect(topic).toBe(null);
});
it('expands shorthand topic id when project_id is known', () => {
const config = createMockConfig({ pubsub_topic: 'my-topic' });
watcher = new GmailWatcher(config, channelLookup);
(watcher as unknown as { googleProjectId: string }).googleProjectId = 'test-project';
const topic = (watcher as unknown as { resolvePubSubTopicName: () => string | null }).resolvePubSubTopicName();
expect(topic).toBe('projects/test-project/topics/my-topic');
});
it('rejects invalid pubsub_topic formats', () => {
const config = createMockConfig({ pubsub_topic: 'projects/test-project/topic/my-topic' });
watcher = new GmailWatcher(config, channelLookup);
expect(() => {
(watcher as unknown as { resolvePubSubTopicName: () => string | null }).resolvePubSubTopicName();
}).toThrow(/Invalid pubsub_topic/);
});
});
describe('pull subscription resolution', () => {
it('returns null when pubsub_subscription_id is not set', () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const sub = (watcher as unknown as { resolvePubSubSubscriptionName: () => string | null }).resolvePubSubSubscriptionName();
expect(sub).toBe(null);
});
it('expands shorthand subscription id when project_id is known', () => {
const config = createMockConfig({ pubsub_subscription_id: 'my-sub' });
watcher = new GmailWatcher(config, channelLookup);
(watcher as unknown as { googleProjectId: string }).googleProjectId = 'test-project';
const sub = (watcher as unknown as { resolvePubSubSubscriptionName: () => string | null }).resolvePubSubSubscriptionName();
expect(sub).toBe('projects/test-project/subscriptions/my-sub');
});
it('rejects invalid pubsub_subscription_id formats', () => {
const config = createMockConfig({ pubsub_subscription_id: 'projects/test-project/subscription/my-sub' });
watcher = new GmailWatcher(config, channelLookup);
expect(() => {
(watcher as unknown as { resolvePubSubSubscriptionName: () => string | null }).resolvePubSubSubscriptionName();
}).toThrow(/Invalid pubsub_subscription_id/);
});
});
describe('connect() with missing credentials', () => {
it('logs warning and sets status to error when credentials_file is missing', async () => {
const config = createMockConfig({ credentials_file: undefined });
@@ -200,6 +279,78 @@ describe('GmailWatcher', () => {
});
});
describe('push disable flag', () => {
it('skips watch setup when disable_push is true', async () => {
const config = createMockConfig({ disable_push: true, pubsub_topic: 'projects/test-project/topics/gmail-push' });
watcher = new GmailWatcher(config, channelLookup);
const { existsSync, readFileSync } = await import('fs');
vi.mocked(existsSync).mockReturnValue(true);
vi.mocked(readFileSync).mockImplementation((path: unknown) => {
const p = String(path);
if (p.includes('credentials')) {
return JSON.stringify({
installed: {
client_id: 'test-client-id',
client_secret: 'test-client-secret',
project_id: 'test-project',
redirect_uris: ['http://localhost'],
},
});
}
return JSON.stringify({
access_token: 'test-access-token',
refresh_token: 'test-refresh-token',
expiry_date: Date.now() + 3600000,
});
});
const googleapis = await import('googleapis') as unknown as {
_mocks: {
mockWatch: ReturnType<typeof vi.fn>;
mockOAuth2: ReturnType<typeof vi.fn>;
};
};
googleapis._mocks.mockOAuth2.mockImplementation(() => ({
setCredentials: vi.fn(),
on: vi.fn(),
}));
const watchSpy = googleapis._mocks.mockWatch;
await watcher.connect();
expect(watchSpy).not.toHaveBeenCalled();
expect(watcher.status).toBe('connected');
});
});
describe('pullSubscriptionMessages', () => {
it('pulls messages and acknowledges successfully processed ones', async () => {
const config = createMockConfig({ pubsub_subscription_id: 'projects/test-project/subscriptions/gmail-pull' });
watcher = new GmailWatcher(config, channelLookup);
const { _mocks: pubsubMocks } = await import('@google-cloud/pubsub') as unknown as {
_mocks: { pull: ReturnType<typeof vi.fn>; acknowledge: ReturnType<typeof vi.fn> };
};
const payload = { emailAddress: 'bob@example.com', historyId: '200' };
pubsubMocks.pull.mockResolvedValueOnce([
{
receivedMessages: [
{ ackId: 'ack-1', message: { data: Buffer.from(JSON.stringify(payload)) } },
],
},
]);
await (watcher as unknown as { pullSubscriptionMessages: () => Promise<void> }).pullSubscriptionMessages();
expect((watcher as unknown as { lastHistoryId: string }).lastHistoryId).toBe('200');
expect(pubsubMocks.acknowledge).toHaveBeenCalledWith({
subscription: 'projects/test-project/subscriptions/gmail-pull',
ackIds: ['ack-1'],
});
});
});
describe('renderTemplate', () => {
it('replaces all placeholders correctly', () => {
const config = createMockConfig({
+222 -11
View File
@@ -2,6 +2,7 @@ import { google, type Auth } from 'googleapis';
import { readFileSync, writeFileSync, existsSync, mkdirSync, chmodSync } from 'fs';
import { dirname, resolve } from 'path';
import { homedir } from 'os';
import type { v1 } from '@google-cloud/pubsub';
import type { GmailConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import { parseInterval } from './heartbeat.js';
@@ -30,9 +31,7 @@ interface PubSubNotification {
historyId: string;
}
// Google Cloud Pub/Sub topic for Gmail push notifications.
// This must be pre-configured in Google Cloud Console.
const GMAIL_PUBSUB_TOPIC = 'projects/flynn-agent/topics/gmail-push';
const DEFAULT_TOPIC_ID = 'gmail-push';
// Watch expires after ~7 days; renew at 6 days (in ms).
const WATCH_RENEWAL_MS = 6 * 24 * 60 * 60 * 1000;
@@ -56,7 +55,11 @@ export class GmailWatcher implements ChannelAdapter {
private lastHistoryId?: string;
private pollTimer?: ReturnType<typeof setInterval>;
private watchTimer?: ReturnType<typeof setInterval>;
private pullTimer?: ReturnType<typeof setInterval>;
private pubsubSubscriber?: v1.SubscriberClient;
private pullInFlight = false;
private readonly config: NonNullable<GmailConfig>;
private googleProjectId?: string;
constructor(
config: NonNullable<GmailConfig>,
@@ -82,12 +85,28 @@ export class GmailWatcher implements ChannelAdapter {
return;
}
// Set up Gmail push watch (Pub/Sub)
// Set up Gmail push watch (Pub/Sub). Polling is always enabled.
if (!this.config.disable_push) {
try {
await this.setupWatch();
} catch (error) {
const errMsg = error instanceof Error ? error.message : 'Unknown error';
const hint = this.buildWatchErrorHint(errMsg);
console.warn(`GmailWatcher: Watch setup failed (will use polling only) — ${errMsg}${hint}`);
}
} else {
const configured = (this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC ?? '').trim();
if (configured) {
console.log('GmailWatcher: Push disabled (disable_push=true)');
}
}
// Set up Pub/Sub pull subscription (optional).
try {
await this.setupWatch();
await this.setupPullSubscription();
} catch (error) {
const errMsg = error instanceof Error ? error.message : 'Unknown error';
console.warn(`GmailWatcher: Watch setup failed (will use polling only) — ${errMsg}`);
console.warn(`GmailWatcher: Pull setup failed (will continue without pull) — ${errMsg}`);
}
// Start polling fallback
@@ -99,8 +118,23 @@ export class GmailWatcher implements ChannelAdapter {
}, pollMs);
this._status = 'connected';
console.log(`GmailWatcher: Connected (poll_interval=${this.config.poll_interval ?? '300s'})`);
auditLogger?.systemStart('GmailWatcher', { poll_interval: this.config.poll_interval });
const modes: string[] = [];
const pushConfigured = Boolean((this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC ?? '').trim());
const pullConfigured = Boolean((this.config.pubsub_subscription_id ?? '').trim());
if (pushConfigured && !this.config.disable_push) {modes.push('push');}
if (pullConfigured) {modes.push('pull');}
modes.push('poll');
console.log(
`GmailWatcher: Connected (${modes.join('+')}, poll_interval=${this.config.poll_interval ?? '300s'}${pullConfigured ? `, pubsub_pull_interval=${this.config.pubsub_pull_interval ?? '60s'}` : ''})`,
);
auditLogger?.systemStart('GmailWatcher', {
modes: modes.join('+'),
poll_interval: this.config.poll_interval,
pubsub_topic: pushConfigured ? 'configured' : 'none',
pubsub_subscription_id: pullConfigured ? 'configured' : 'none',
});
}
async disconnect(): Promise<void> {
@@ -109,9 +143,21 @@ export class GmailWatcher implements ChannelAdapter {
this.pollTimer = undefined;
}
if (this.watchTimer) {
clearTimeout(this.watchTimer);
clearInterval(this.watchTimer);
this.watchTimer = undefined;
}
if (this.pullTimer) {
clearInterval(this.pullTimer);
this.pullTimer = undefined;
}
if (this.pubsubSubscriber) {
try {
await this.pubsubSubscriber.close();
} catch {
// Ignore shutdown errors
}
this.pubsubSubscriber = undefined;
}
this.oauth2Client = undefined;
this._status = 'disconnected';
auditLogger?.systemStop('GmailWatcher');
@@ -178,7 +224,10 @@ export class GmailWatcher implements ChannelAdapter {
}
const credentials = JSON.parse(readFileSync(expandedCredsPath, 'utf-8'));
const { client_id, client_secret, redirect_uris } = credentials.installed ?? credentials.web ?? {};
const { client_id, client_secret, redirect_uris, project_id } = credentials.installed ?? credentials.web ?? {};
if (project_id && typeof project_id === 'string') {
this.googleProjectId = project_id;
}
if (!client_id || !client_secret) {
throw new Error('Invalid credentials file — missing client_id or client_secret');
@@ -217,13 +266,24 @@ export class GmailWatcher implements ChannelAdapter {
private async setupWatch(): Promise<void> {
if (!this.oauth2Client) {return;}
if (this.watchTimer) {
clearInterval(this.watchTimer);
this.watchTimer = undefined;
}
const topicName = this.resolvePubSubTopicName();
if (!topicName) {
// Push notifications are optional; polling is always enabled.
return;
}
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
const watchResponse = await gmail.users.watch({
userId: 'me',
requestBody: {
labelIds: this.config.watch_labels ?? ['INBOX'],
topicName: GMAIL_PUBSUB_TOPIC,
topicName,
},
});
@@ -241,6 +301,157 @@ export class GmailWatcher implements ChannelAdapter {
}, WATCH_RENEWAL_MS);
}
private buildWatchErrorHint(errMsg: string): string {
const hints: string[] = [];
if (errMsg.includes('Invalid topicName')) {
hints.push(
`Tip: set automation.gmail.pubsub_topic to "projects/${this.googleProjectId ?? '<project-id>'}/topics/${DEFAULT_TOPIC_ID}"`,
);
}
if (/permission denied|PERMISSION_DENIED/i.test(errMsg)) {
hints.push('Tip: ensure Gmail has permission to publish to the Pub/Sub topic (IAM)');
}
hints.push('Tip: if Google cannot reach your gateway, set automation.gmail.pubsub_subscription_id for pull mode');
return hints.length > 0 ? `\n ${hints.join('\n ')}` : '';
}
/**
* Resolve the Pub/Sub topic resource name for Gmail push notifications.
*
* Priority:
* 1) automation.gmail.pubsub_topic
* 2) FLYNN_GMAIL_PUBSUB_TOPIC env var
* If neither is provided, push notifications are disabled.
*/
private resolvePubSubTopicName(): string | null {
const configured = this.config.pubsub_topic ?? process.env.FLYNN_GMAIL_PUBSUB_TOPIC;
let topic = (configured ?? '').trim();
if (!topic) {return null;}
// Allow shorthand: just the topic id (e.g. "gmail-push")
if (!topic.includes('/')) {
if (!this.googleProjectId) {
throw new Error(
`pubsub_topic '${topic}' must be fully qualified (projects/<project-id>/topics/<topic>) because project_id was not found in credentials`,
);
}
topic = `projects/${this.googleProjectId}/topics/${topic}`;
}
const isValid = /^projects\/[^/]+\/topics\/[^/]+$/.test(topic);
if (!isValid) {
throw new Error(
`Invalid pubsub_topic '${topic}'. Expected: projects/<project-id>/topics/<topic>`,
);
}
return topic;
}
private resolvePubSubSubscriptionName(): string | null {
let sub = (this.config.pubsub_subscription_id ?? '').trim();
if (!sub) {return null;}
// Allow shorthand: just the subscription id (e.g. "gmail-pull")
if (!sub.includes('/')) {
if (!this.googleProjectId) {
throw new Error(
`pubsub_subscription_id '${sub}' must be fully qualified (projects/<project-id>/subscriptions/<subscription>) because project_id was not found in credentials`,
);
}
sub = `projects/${this.googleProjectId}/subscriptions/${sub}`;
}
const isValid = /^projects\/[^/]+\/subscriptions\/[^/]+$/.test(sub);
if (!isValid) {
throw new Error(
`Invalid pubsub_subscription_id '${sub}'. Expected: projects/<project-id>/subscriptions/<subscription>`,
);
}
return sub;
}
private async setupPullSubscription(): Promise<void> {
const subscriptionName = this.resolvePubSubSubscriptionName();
if (!subscriptionName) {return;}
if (this.pullTimer) {
clearInterval(this.pullTimer);
this.pullTimer = undefined;
}
const pullMs = parseInterval(this.config.pubsub_pull_interval ?? '60s');
// Kick once immediately, then on interval.
await this.pullSubscriptionMessages().catch((err) => {
console.error('GmailWatcher: Pub/Sub pull error —', err instanceof Error ? err.message : err);
});
this.pullTimer = setInterval(() => {
this.pullSubscriptionMessages().catch((err) => {
console.error('GmailWatcher: Pub/Sub pull error —', err instanceof Error ? err.message : err);
});
}, pullMs);
console.log(
`GmailWatcher: Pull enabled (subscription=${subscriptionName}, interval=${this.config.pubsub_pull_interval ?? '60s'})`,
);
}
private async getSubscriberClient(): Promise<v1.SubscriberClient> {
if (this.pubsubSubscriber) {return this.pubsubSubscriber;}
const mod = await import('@google-cloud/pubsub');
this.pubsubSubscriber = new mod.v1.SubscriberClient();
return this.pubsubSubscriber;
}
private async pullSubscriptionMessages(): Promise<void> {
const subscription = this.resolvePubSubSubscriptionName();
if (!subscription) {return;}
if (this.pullInFlight) {return;}
this.pullInFlight = true;
try {
const client = await this.getSubscriberClient();
const maxMessages = this.config.pubsub_max_messages ?? 10;
const [response] = await client.pull({
subscription,
maxMessages,
});
const received = response.receivedMessages ?? [];
if (received.length === 0) {return;}
const ackIds: string[] = [];
for (const receivedMessage of received) {
const ackId = receivedMessage.ackId;
const data = receivedMessage.message?.data;
if (!ackId || !data) {continue;}
const base64 = Buffer.from(data as Uint8Array).toString('base64');
try {
await this.handlePushNotification(base64);
ackIds.push(ackId);
} catch {
// If processing fails, leave message unacked for retry.
}
}
if (ackIds.length > 0) {
await client.acknowledge({ subscription, ackIds });
}
} finally {
this.pullInFlight = false;
}
}
/**
* Poll Gmail History API for new messages since lastHistoryId.
* Fallback mechanism when Pub/Sub push is not available.