diff --git a/package.json b/package.json index b8f5f1b..e4610d3 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,7 @@ "commander": "^14.0.3", "croner": "^10.0.1", "discord.js": "^14.25.1", + "googleapis": "^148.0.0", "grammy": "^1.35.0", "ink": "^6.0.0", "ink-text-input": "^6.0.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0616c80..69e812b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: discord.js: specifier: ^14.25.1 version: 14.25.1 + googleapis: + specifier: ^148.0.0 + version: 148.0.0 grammy: specifier: ^1.35.0 version: 1.39.3 @@ -1206,6 +1209,9 @@ packages: resolution: {integrity: sha512-QxD8cf2eVqJOOz63z6JIN9BzvVs/dlySa5HGSBH5xtR8dPteIRQnBxxKqkNTiT6jbDTF6jAfrd4oMcND9RGbQg==} engines: {node: '>=0.6'} + bignumber.js@9.3.1: + resolution: {integrity: sha512-Ko0uX15oIUS7wJ3Rb30Fs6SkVbLmPBAKdlm7q9+ak9bbIeFf0MwuBsQV6z7+X768/cHsfg+WlysDWJcmthjsjQ==} + binary@0.3.0: resolution: {integrity: sha512-D4H1y5KYwpJgK8wk1Cue5LLPgmwHKYSChkbspQg5JtVuR5ulGckxfR62H3AE9UDkdMC8yyXlqYihuz3Aqg2XZg==} @@ -1677,6 +1683,9 @@ packages: resolution: {integrity: sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==} engines: {node: '>= 18'} + extend@3.0.2: + resolution: {integrity: sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==} + extract-zip@2.0.1: resolution: {integrity: sha512-GDhU9ntwuKyGXdZBUgTIe+vXnWj0fppUEtMDL0+idd5Sta8TGpHssn/eusA9mrPr9qNDym6SxAYZjNvCn/9RBg==} engines: {node: '>= 10.17.0'} @@ -1791,6 +1800,14 @@ packages: function-bind@1.1.2: resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==} + gaxios@6.7.1: + resolution: {integrity: sha512-LDODD4TMYx7XXdpwxAVRAIAuB0bzv0s+ywFonY46k126qzQHT9ygyoa9tncmOiQmmDrik65UYsEkv3lbfqQ3yQ==} + engines: {node: '>=14'} + + gcp-metadata@6.1.1: + resolution: {integrity: sha512-a4tiq7E0/5fTjxPAaH4jpjkSv/uCaU2p5KC6HVGrvl0cDjA8iBZv4vv1gyzlmK0ZUKqwpOyQMKzZQe3lTit77A==} + engines: {node: '>=14'} + get-caller-file@2.0.5: resolution: {integrity: sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg==} engines: {node: 6.* || 8.* || >= 10.*} @@ -1833,6 +1850,22 @@ packages: resolution: {integrity: sha512-oahGvuMGQlPw/ivIYBjVSrWAfWLBeku5tpPE2fOPLi+WHffIWbuh2tCjhyQhTBPMf5E9jDEH4FOmTYgYwbKwtQ==} engines: {node: '>=18'} + google-auth-library@9.15.1: + resolution: {integrity: sha512-Jb6Z0+nvECVz+2lzSMt9u98UsoakXxA2HGHMCxh+so3n90XgYWkq5dur19JAJV7ONiJY22yBTyJB1TSkvPq9Ng==} + engines: {node: '>=14'} + + google-logging-utils@0.0.2: + resolution: {integrity: sha512-NEgUnEcBiP5HrPzufUkBzJOD/Sxsco3rLNo1F1TNf7ieU8ryUzBhqba8r756CjLX7rn3fHl6iLEwPYuqpoKgQQ==} + engines: {node: '>=14'} + + googleapis-common@7.2.0: + resolution: {integrity: sha512-/fhDZEJZvOV3X5jmD+fKxMqma5q2Q9nZNSF3kn1F18tpxmA86BcTxAGBQdM0N89Z3bEaIs+HVznSmFJEAmMTjA==} + engines: {node: '>=14.0.0'} + + googleapis@148.0.0: + resolution: {integrity: sha512-8PDG5VItm6E1TdZWDqtRrUJSlBcNwz0/MwCa6AL81y/RxPGXJRUwKqGZfCoVX1ZBbfr3I4NkDxBmeTyOAZSWqw==} + engines: {node: '>=14.0.0'} + gopd@1.2.0: resolution: {integrity: sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==} engines: {node: '>= 0.4'} @@ -1844,6 +1877,10 @@ packages: resolution: {integrity: sha512-7arRRoOtOh9UwMwANZ475kJrWV6P3/EGNooeHlY0/SwZv4t3ZZ3Uiz9cAXK8Zg9xSdgmm8T21kx6n7SZaWvOcw==} engines: {node: ^12.20.0 || >=14.13.1} + gtoken@7.1.0: + resolution: {integrity: sha512-pCcEwRi+TKpMlxAQObHDQ56KawURgyAf6jtIY046fJ5tIv3zDe/LEIubckAO8fj6JnAxLdmWkUfNyulQ2iKdEw==} + engines: {node: '>=14.0.0'} + has-flag@4.0.0: resolution: {integrity: sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==} engines: {node: '>=8'} @@ -2002,6 +2039,9 @@ packages: resolution: {integrity: sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA==} hasBin: true + json-bigint@1.0.0: + resolution: {integrity: sha512-SiPv/8VpZuWbvLSMtTDU8hEfrZWg/mH/nV/b4o0CYbSxu1UIQPLdwKOCIyLQX+VIPO5vrLX3i8qtqFyhdPSUSQ==} + json-buffer@3.0.1: resolution: {integrity: sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ==} @@ -2790,9 +2830,16 @@ packages: uri-js@4.4.1: resolution: {integrity: sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==} + url-template@2.0.8: + resolution: {integrity: sha512-XdVKMF4SJ0nP/O7XIPB0JwAEuT9lDIYnNsK8yGVe43y0AWoKeJNdv3ZNWh7ksJ6KqQFjOO6ox/VEitLnaVNufw==} + util-deprecate@1.0.2: resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==} + uuid@9.0.1: + resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} + hasBin: true + vary@1.1.2: resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} engines: {node: '>= 0.8'} @@ -4437,6 +4484,8 @@ snapshots: big-integer@1.6.52: optional: true + bignumber.js@9.3.1: {} + binary@0.3.0: dependencies: buffers: 0.1.1 @@ -4977,6 +5026,8 @@ snapshots: transitivePeerDependencies: - supports-color + extend@3.0.2: {} + extract-zip@2.0.1: dependencies: debug: 4.4.3 @@ -5089,6 +5140,26 @@ snapshots: function-bind@1.1.2: {} + gaxios@6.7.1: + dependencies: + extend: 3.0.2 + https-proxy-agent: 7.0.6 + is-stream: 2.0.1 + node-fetch: 2.7.0 + uuid: 9.0.1 + transitivePeerDependencies: + - encoding + - supports-color + + gcp-metadata@6.1.1: + dependencies: + gaxios: 6.7.1 + google-logging-utils: 0.0.2 + json-bigint: 1.0.0 + transitivePeerDependencies: + - encoding + - supports-color + get-caller-file@2.0.5: {} get-east-asian-width@1.4.0: {} @@ -5145,6 +5216,40 @@ snapshots: globals@14.0.0: {} + google-auth-library@9.15.1: + dependencies: + base64-js: 1.5.1 + ecdsa-sig-formatter: 1.0.11 + gaxios: 6.7.1 + gcp-metadata: 6.1.1 + gtoken: 7.1.0 + jws: 4.0.1 + transitivePeerDependencies: + - encoding + - supports-color + + google-logging-utils@0.0.2: {} + + googleapis-common@7.2.0: + dependencies: + extend: 3.0.2 + gaxios: 6.7.1 + google-auth-library: 9.15.1 + qs: 6.14.1 + url-template: 2.0.8 + uuid: 9.0.1 + transitivePeerDependencies: + - encoding + - supports-color + + googleapis@148.0.0: + dependencies: + google-auth-library: 9.15.1 + googleapis-common: 7.2.0 + transitivePeerDependencies: + - encoding + - supports-color + gopd@1.2.0: {} graceful-fs@4.2.11: @@ -5160,6 +5265,14 @@ snapshots: - encoding - supports-color + gtoken@7.1.0: + dependencies: + gaxios: 6.7.1 + jws: 4.0.1 + transitivePeerDependencies: + - encoding + - supports-color + has-flag@4.0.0: {} has-symbols@1.1.0: {} @@ -5318,6 +5431,10 @@ snapshots: dependencies: argparse: 2.0.1 + json-bigint@1.0.0: + dependencies: + bignumber.js: 9.3.1 + json-buffer@3.0.1: {} json-parse-even-better-errors@2.3.1: {} @@ -6196,8 +6313,12 @@ snapshots: dependencies: punycode: 2.3.1 + url-template@2.0.8: {} + util-deprecate@1.0.2: {} + uuid@9.0.1: {} + vary@1.1.2: {} vite-node@3.2.4(@types/node@22.19.7)(tsx@4.21.0)(yaml@2.8.2): diff --git a/src/automation/gmail.test.ts b/src/automation/gmail.test.ts new file mode 100644 index 0000000..70ce55d --- /dev/null +++ b/src/automation/gmail.test.ts @@ -0,0 +1,400 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { homedir } from 'os'; +import { GmailWatcher } from './gmail.js'; +import type { OutboundMessage } from '../channels/types.js'; + +// Mock googleapis module +vi.mock('googleapis', () => { + const mockWatch = vi.fn().mockResolvedValue({ + data: { historyId: '12345', expiration: String(Date.now() + 7 * 24 * 60 * 60 * 1000) }, + }); + + const mockHistoryList = vi.fn().mockResolvedValue({ + data: { + history: [ + { + messagesAdded: [ + { message: { id: 'msg-001' } }, + { message: { id: 'msg-002' } }, + ], + }, + ], + historyId: '12346', + }, + }); + + const mockMessagesGet = vi.fn().mockImplementation(({ id }: { id: string }) => { + return Promise.resolve({ + data: { + id, + snippet: 'Hello, this is a test email', + labelIds: ['INBOX', 'UNREAD'], + payload: { + headers: [ + { name: 'From', value: 'alice@example.com' }, + { name: 'To', value: 'bob@example.com' }, + { name: 'Subject', value: 'Test Subject' }, + { name: 'Date', value: 'Sat, 07 Feb 2026 10:00:00 +0000' }, + ], + }, + }, + }); + }); + + const mockGetProfile = vi.fn().mockResolvedValue({ + data: { emailAddress: 'bob@example.com', historyId: '12344' }, + }); + + const mockOAuth2 = vi.fn().mockImplementation(() => ({ + setCredentials: vi.fn(), + on: vi.fn(), + })); + + return { + google: { + auth: { + OAuth2: mockOAuth2, + }, + gmail: vi.fn().mockReturnValue({ + users: { + watch: mockWatch, + getProfile: mockGetProfile, + history: { list: mockHistoryList }, + messages: { get: mockMessagesGet }, + }, + }), + }, + _mocks: { + mockWatch, + mockHistoryList, + mockMessagesGet, + mockGetProfile, + mockOAuth2, + }, + }; +}); + +// Mock fs operations +vi.mock('fs', async () => { + const actual = await vi.importActual('fs'); + return { + ...actual, + existsSync: vi.fn().mockReturnValue(true), + readFileSync: vi.fn().mockImplementation((path: string) => { + if (path.includes('credentials')) { + return JSON.stringify({ + installed: { + client_id: 'test-client-id', + client_secret: 'test-client-secret', + redirect_uris: ['http://localhost'], + }, + }); + } + // Token file + return JSON.stringify({ + access_token: 'test-access-token', + refresh_token: 'test-refresh-token', + expiry_date: Date.now() + 3600000, + }); + }), + writeFileSync: vi.fn(), + mkdirSync: vi.fn(), + chmodSync: vi.fn(), + }; +}); + +function createMockConfig(overrides = {}) { + return { + enabled: true, + credentials_file: '~/.config/flynn/gmail-credentials.json', + token_file: '~/.config/flynn/gmail-token.json', + watch_labels: ['INBOX'], + poll_interval: '60s', + output: { + channel: 'telegram', + peer: '12345', + }, + message: 'New email from {{from}}: {{subject}}\n\n{{snippet}}', + ...overrides, + }; +} + +function createMockChannelLookup() { + const mockSend = vi.fn().mockResolvedValue(undefined); + return { + get: vi.fn().mockReturnValue({ send: mockSend }), + _mockSend: mockSend, + }; +} + +describe('GmailWatcher', () => { + let watcher: GmailWatcher; + let channelLookup: ReturnType; + + beforeEach(() => { + vi.useFakeTimers(); + channelLookup = createMockChannelLookup(); + }); + + afterEach(async () => { + if (watcher) { + await watcher.disconnect(); + } + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + describe('construction', () => { + it('creates with valid config', () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + expect(watcher.name).toBe('gmail'); + expect(watcher.status).toBe('disconnected'); + }); + }); + + describe('connect() with missing credentials', () => { + it('logs warning and sets status to error when credentials_file is missing', async () => { + const config = createMockConfig({ credentials_file: undefined }); + watcher = new GmailWatcher(config, channelLookup); + + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + await watcher.connect(); + + expect(watcher.status).toBe('error'); + expect(errorSpy).toHaveBeenCalledWith( + expect.stringContaining('GmailWatcher: Authorization failed'), + ); + + // The actual error includes instructions + const calls = errorSpy.mock.calls.flat().join(' '); + expect(calls).toContain('flynn gmail-auth'); + + errorSpy.mockRestore(); + }); + + it('logs warning when token file does not exist', async () => { + const { existsSync } = await import('fs'); + const mockExistsSync = vi.mocked(existsSync); + // credentials file exists but token file does not + mockExistsSync.mockImplementation((path: unknown) => { + const p = String(path); + if (p.includes('credentials')) return true; + if (p.includes('token')) return false; + return true; + }); + + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + await watcher.connect(); + + expect(watcher.status).toBe('error'); + const calls = errorSpy.mock.calls.flat().join(' '); + expect(calls).toContain('flynn gmail-auth'); + + errorSpy.mockRestore(); + }); + }); + + describe('renderTemplate', () => { + it('replaces all placeholders correctly', () => { + const config = createMockConfig({ + message: 'From: {{from}} To: {{to}} Subject: {{subject}} Snippet: {{snippet}} Date: {{date}} ID: {{id}} Labels: {{labels}}', + }); + watcher = new GmailWatcher(config, channelLookup); + + const email = { + id: 'msg-123', + from: 'alice@example.com', + to: 'bob@example.com', + subject: 'Hello!', + snippet: 'How are you?', + date: '2026-02-07T10:00:00Z', + labels: ['INBOX', 'UNREAD'], + }; + + const result = watcher.renderTemplate(email); + + expect(result).toBe( + 'From: alice@example.com To: bob@example.com Subject: Hello! Snippet: How are you? Date: 2026-02-07T10:00:00Z ID: msg-123 Labels: INBOX, UNREAD', + ); + }); + + it('handles missing fields gracefully', () => { + const config = createMockConfig({ + message: '{{from}}: {{subject}}', + }); + watcher = new GmailWatcher(config, channelLookup); + + const email = { + id: '', + from: '', + to: '', + subject: '', + snippet: '', + date: '', + labels: [], + }; + + const result = watcher.renderTemplate(email); + expect(result).toBe(': '); + }); + + it('replaces multiple occurrences of the same placeholder', () => { + const config = createMockConfig({ + message: '{{from}} sent: {{subject}} (from {{from}})', + }); + watcher = new GmailWatcher(config, channelLookup); + + const email = { + id: 'msg-1', + from: 'alice@example.com', + to: 'bob@example.com', + subject: 'Hi', + snippet: '', + date: '', + labels: [], + }; + + const result = watcher.renderTemplate(email); + expect(result).toBe('alice@example.com sent: Hi (from alice@example.com)'); + }); + }); + + describe('expandPath', () => { + it('expands ~ to homedir', () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + const result = watcher.expandPath('~/some/path'); + expect(result).toBe(`${homedir()}/some/path`); + }); + + it('expands bare ~', () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + const result = watcher.expandPath('~'); + expect(result).toBe(homedir()); + }); + + it('resolves absolute paths unchanged', () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + const result = watcher.expandPath('/tmp/test.json'); + expect(result).toBe('/tmp/test.json'); + }); + }); + + describe('handlePushNotification', () => { + it('decodes base64 data and updates historyId', async () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + // Set initial historyId via direct access + (watcher as unknown as { lastHistoryId: string }).lastHistoryId = '100'; + + const notification = { emailAddress: 'bob@example.com', historyId: '200' }; + const encoded = Buffer.from(JSON.stringify(notification)).toString('base64'); + + await watcher.handlePushNotification(encoded); + + // historyId should be updated + expect((watcher as unknown as { lastHistoryId: string }).lastHistoryId).toBe('200'); + }); + + it('ignores stale historyId', async () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + (watcher as unknown as { lastHistoryId: string }).lastHistoryId = '200'; + + const notification = { emailAddress: 'bob@example.com', historyId: '100' }; + const encoded = Buffer.from(JSON.stringify(notification)).toString('base64'); + + await watcher.handlePushNotification(encoded); + + // historyId should NOT change + expect((watcher as unknown as { lastHistoryId: string }).lastHistoryId).toBe('200'); + }); + + it('handles invalid base64 gracefully', async () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); + + await watcher.handlePushNotification('not-valid-json-after-decode!!!'); + + // Should not throw — just log error + expect(errorSpy).toHaveBeenCalled(); + errorSpy.mockRestore(); + }); + }); + + describe('disconnect', () => { + it('clears timers and sets status to disconnected', async () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + // Manually set status and timers + (watcher as unknown as { _status: string })._status = 'connected'; + (watcher as unknown as { pollTimer: ReturnType }).pollTimer = setInterval(() => {}, 60000); + (watcher as unknown as { watchTimer: ReturnType }).watchTimer = setInterval(() => {}, 60000); + + await watcher.disconnect(); + + expect(watcher.status).toBe('disconnected'); + expect((watcher as unknown as { pollTimer: unknown }).pollTimer).toBeUndefined(); + expect((watcher as unknown as { watchTimer: unknown }).watchTimer).toBeUndefined(); + }); + }); + + describe('send', () => { + it('routes to output channel via ChannelLookup', async () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + const message: OutboundMessage = { text: 'Reply text' }; + + await watcher.send('some-peer', message); + + expect(channelLookup.get).toHaveBeenCalledWith('telegram'); + expect(channelLookup._mockSend).toHaveBeenCalledWith('12345', message); + }); + + it('warns when output channel not found', async () => { + const config = createMockConfig(); + const emptyLookup = { get: vi.fn().mockReturnValue(undefined) }; + watcher = new GmailWatcher(config, emptyLookup); + + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + await watcher.send('some-peer', { text: 'test' }); + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("Output channel 'telegram' not found"), + ); + + warnSpy.mockRestore(); + }); + }); + + describe('onMessage', () => { + it('registers message handler', () => { + const config = createMockConfig(); + watcher = new GmailWatcher(config, channelLookup); + + const handler = vi.fn(); + watcher.onMessage(handler); + + // Verify handler is stored (indirectly, via processHistoryChanges) + expect((watcher as unknown as { messageHandler: unknown }).messageHandler).toBe(handler); + }); + }); +}); diff --git a/src/automation/gmail.ts b/src/automation/gmail.ts new file mode 100644 index 0000000..ee1377d --- /dev/null +++ b/src/automation/gmail.ts @@ -0,0 +1,422 @@ +import { google, type Auth } from 'googleapis'; +import { readFileSync, writeFileSync, existsSync, mkdirSync, chmodSync } from 'fs'; +import { dirname, resolve } from 'path'; +import { homedir } from 'os'; +import type { GmailConfig } from '../config/schema.js'; +import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; +import { parseInterval } from './heartbeat.js'; + +/** Minimal interface for the parts of ChannelRegistry we need. */ +interface ChannelLookup { + get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; +} + +/** Parsed email information used for template rendering. */ +interface EmailInfo { + id: string; + from: string; + to: string; + subject: string; + snippet: string; + date: string; + labels: string[]; +} + +/** Pub/Sub push notification payload from Google. */ +interface PubSubNotification { + emailAddress: string; + historyId: string; +} + +// Google Cloud Pub/Sub topic for Gmail push notifications. +// This must be pre-configured in Google Cloud Console. +const GMAIL_PUBSUB_TOPIC = 'projects/flynn-agent/topics/gmail-push'; + +// Watch expires after ~7 days; renew at 6 days (in ms). +const WATCH_RENEWAL_MS = 6 * 24 * 60 * 60 * 1000; + +/** + * GmailWatcher monitors a Gmail inbox for new messages and forwards them + * as InboundMessages via the channel adapter pattern. + * + * Supports two modes: + * - **Pub/Sub push**: Google sends push notifications when new emails arrive. + * Requires a POST /gmail/push route on the gateway. + * - **Polling fallback**: Periodically polls Gmail History API for changes. + * + * Authentication uses OAuth2 with a stored refresh token. + */ +export class GmailWatcher implements ChannelAdapter { + readonly name = 'gmail'; + private _status: ChannelStatus = 'disconnected'; + private messageHandler?: (msg: InboundMessage) => void; + private oauth2Client?: Auth.OAuth2Client; + private lastHistoryId?: string; + private pollTimer?: ReturnType; + private watchTimer?: ReturnType; + private readonly config: NonNullable; + + constructor( + config: NonNullable, + private readonly channelLookup: ChannelLookup, + ) { + this.config = config; + } + + get status(): ChannelStatus { + return this._status; + } + + async connect(): Promise { + this._status = 'connecting'; + + try { + this.oauth2Client = await this.authorize(); + } catch (error) { + const errMsg = error instanceof Error ? error.message : 'Unknown error'; + console.error(`GmailWatcher: Authorization failed — ${errMsg}`); + console.error('GmailWatcher: Run "flynn gmail-auth" to set up Gmail credentials.'); + this._status = 'error'; + return; + } + + // Set up Gmail push watch (Pub/Sub) + try { + await this.setupWatch(); + } catch (error) { + const errMsg = error instanceof Error ? error.message : 'Unknown error'; + console.warn(`GmailWatcher: Watch setup failed (will use polling only) — ${errMsg}`); + } + + // Start polling fallback + const pollMs = parseInterval(this.config.poll_interval ?? '60s'); + this.pollTimer = setInterval(() => { + this.pollForNewMessages().catch((err) => { + console.error('GmailWatcher: Poll error —', err instanceof Error ? err.message : err); + }); + }, pollMs); + + this._status = 'connected'; + console.log(`GmailWatcher: Connected (poll_interval=${this.config.poll_interval ?? '60s'})`); + } + + async disconnect(): Promise { + if (this.pollTimer) { + clearInterval(this.pollTimer); + this.pollTimer = undefined; + } + if (this.watchTimer) { + clearInterval(this.watchTimer); + this.watchTimer = undefined; + } + this.oauth2Client = undefined; + this._status = 'disconnected'; + } + + async send(peerId: string, message: OutboundMessage): Promise { + // Route responses to the configured output channel + const outputAdapter = this.channelLookup.get(this.config.output.channel); + if (!outputAdapter) { + console.warn(`GmailWatcher: Output channel '${this.config.output.channel}' not found`); + return; + } + + await outputAdapter.send(this.config.output.peer, message); + } + + onMessage(handler: (msg: InboundMessage) => void): void { + this.messageHandler = handler; + } + + /** + * Handle a Pub/Sub push notification from Google. + * Called by the gateway when POST /gmail/push is received. + * @param data Base64-encoded Pub/Sub message data + */ + async handlePushNotification(data: string): Promise { + try { + const decoded = Buffer.from(data, 'base64').toString('utf-8'); + const notification = JSON.parse(decoded) as PubSubNotification; + + if (!notification.historyId) { + console.warn('GmailWatcher: Push notification missing historyId'); + return; + } + + // Only process if the new historyId is greater than our last known one + if (this.lastHistoryId && BigInt(notification.historyId) <= BigInt(this.lastHistoryId)) { + return; + } + + if (this.lastHistoryId) { + await this.processHistoryChanges(this.lastHistoryId); + } + + this.lastHistoryId = notification.historyId; + } catch (error) { + console.error('GmailWatcher: Push notification error —', error instanceof Error ? error.message : error); + } + } + + /** + * Authorize with Gmail using stored OAuth2 credentials. + * Reads client credentials and stored token from config paths. + */ + private async authorize(): Promise { + const credentialsPath = this.config.credentials_file; + if (!credentialsPath) { + throw new Error('No credentials_file configured. Set automation.gmail.credentials_file in config.'); + } + + const expandedCredsPath = this.expandPath(credentialsPath); + if (!existsSync(expandedCredsPath)) { + throw new Error(`Credentials file not found: ${expandedCredsPath}`); + } + + const credentials = JSON.parse(readFileSync(expandedCredsPath, 'utf-8')); + const { client_id, client_secret, redirect_uris } = credentials.installed ?? credentials.web ?? {}; + + if (!client_id || !client_secret) { + throw new Error('Invalid credentials file — missing client_id or client_secret'); + } + + const oauth2Client = new google.auth.OAuth2( + client_id, + client_secret, + redirect_uris?.[0] ?? 'http://localhost', + ); + + // Load stored token + const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json'); + if (!existsSync(tokenPath)) { + throw new Error( + `Token file not found: ${tokenPath}. Run "flynn gmail-auth" to authenticate.`, + ); + } + + const token = JSON.parse(readFileSync(tokenPath, 'utf-8')); + oauth2Client.setCredentials(token); + + // Auto-save refreshed tokens + oauth2Client.on('tokens', (newTokens) => { + const merged = { ...token, ...newTokens }; + this.saveToken(merged); + }); + + return oauth2Client; + } + + /** + * Set up Gmail Pub/Sub watch for push notifications. + * Calls gmail.users.watch() and schedules renewal before expiry. + */ + private async setupWatch(): Promise { + if (!this.oauth2Client) return; + + const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); + + const watchResponse = await gmail.users.watch({ + userId: 'me', + requestBody: { + labelIds: this.config.watch_labels ?? ['INBOX'], + topicName: GMAIL_PUBSUB_TOPIC, + }, + }); + + if (watchResponse.data.historyId) { + this.lastHistoryId = watchResponse.data.historyId.toString(); + } + + console.log(`GmailWatcher: Watch registered (historyId=${this.lastHistoryId})`); + + // Schedule renewal before watch expiry (~7 days) + this.watchTimer = setInterval(() => { + this.setupWatch().catch((err) => { + console.error('GmailWatcher: Watch renewal failed —', err instanceof Error ? err.message : err); + }); + }, WATCH_RENEWAL_MS); + } + + /** + * Poll Gmail History API for new messages since lastHistoryId. + * Fallback mechanism when Pub/Sub push is not available. + */ + private async pollForNewMessages(): Promise { + if (!this.oauth2Client) return; + + const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); + + // If no historyId yet, initialize it from the profile + if (!this.lastHistoryId) { + try { + const profile = await gmail.users.getProfile({ userId: 'me' }); + if (profile.data.historyId) { + this.lastHistoryId = profile.data.historyId.toString(); + } + } catch (error) { + console.error('GmailWatcher: Failed to get profile —', error instanceof Error ? error.message : error); + } + return; // First poll — just establish the baseline + } + + await this.processHistoryChanges(this.lastHistoryId); + } + + /** + * Fetch history changes since the given historyId and process new messages. + * Updates lastHistoryId to the latest value from the response. + */ + private async processHistoryChanges(startHistoryId: string): Promise { + if (!this.oauth2Client) return; + + const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); + + try { + const historyResponse = await gmail.users.history.list({ + userId: 'me', + startHistoryId: startHistoryId, + labelId: (this.config.watch_labels ?? ['INBOX'])[0], + historyTypes: ['messageAdded'], + }); + + const history = historyResponse.data.history ?? []; + const processedIds = new Set(); + + for (const record of history) { + const addedMessages = record.messagesAdded ?? []; + for (const added of addedMessages) { + const messageId = added.message?.id; + if (!messageId || processedIds.has(messageId)) continue; + processedIds.add(messageId); + + const email = await this.getMessageDetails(messageId); + if (!email) continue; + + // Skip messages before history_start if configured + if (this.config.history_start) { + const emailDate = new Date(email.date); + const startDate = new Date(this.config.history_start); + if (emailDate < startDate) continue; + } + + const text = this.renderTemplate(email); + + const msg: InboundMessage = { + id: `gmail-${email.id}-${Date.now()}`, + channel: 'gmail', + senderId: email.from, + senderName: `gmail:${email.from}`, + text, + timestamp: Date.now(), + metadata: { + emailId: email.id, + from: email.from, + to: email.to, + subject: email.subject, + labels: email.labels, + }, + }; + + this.messageHandler?.(msg); + } + } + + // Update historyId to the latest + if (historyResponse.data.historyId) { + this.lastHistoryId = historyResponse.data.historyId.toString(); + } + } catch (error: unknown) { + // 404 means historyId is too old — reset by fetching profile + if (error instanceof Error && 'code' in error && (error as { code: number }).code === 404) { + console.warn('GmailWatcher: History expired, re-syncing...'); + try { + const profile = await gmail.users.getProfile({ userId: 'me' }); + if (profile.data.historyId) { + this.lastHistoryId = profile.data.historyId.toString(); + } + } catch (profileError) { + console.error('GmailWatcher: Failed to re-sync profile —', profileError instanceof Error ? profileError.message : profileError); + } + } else { + throw error; + } + } + } + + /** + * Fetch full message details by ID and extract relevant headers. + */ + private async getMessageDetails(messageId: string): Promise { + if (!this.oauth2Client) return null; + + const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client }); + + try { + const msg = await gmail.users.messages.get({ + userId: 'me', + id: messageId, + format: 'metadata', + metadataHeaders: ['From', 'To', 'Subject', 'Date'], + }); + + const headers = msg.data.payload?.headers ?? []; + const getHeader = (name: string): string => + headers.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value ?? ''; + + return { + id: messageId, + from: getHeader('From'), + to: getHeader('To'), + subject: getHeader('Subject'), + snippet: msg.data.snippet ?? '', + date: getHeader('Date'), + labels: msg.data.labelIds ?? [], + }; + } catch (error) { + console.error(`GmailWatcher: Failed to fetch message ${messageId} —`, error instanceof Error ? error.message : error); + return null; + } + } + + /** + * Render the message template with email placeholders. + * Supported: {{from}}, {{to}}, {{subject}}, {{snippet}}, {{date}}, {{id}}, {{labels}} + */ + renderTemplate(email: EmailInfo): string { + return this.config.message + .replace(/\{\{from\}\}/g, email.from) + .replace(/\{\{to\}\}/g, email.to) + .replace(/\{\{subject\}\}/g, email.subject) + .replace(/\{\{snippet\}\}/g, email.snippet) + .replace(/\{\{date\}\}/g, email.date) + .replace(/\{\{id\}\}/g, email.id) + .replace(/\{\{labels\}\}/g, email.labels.join(', ')); + } + + /** + * Expand ~ to the user's home directory. + */ + expandPath(p: string): string { + if (p.startsWith('~/') || p === '~') { + return resolve(homedir(), p.slice(2)); + } + return resolve(p); + } + + /** + * Save token to disk with restrictive permissions (0o600). + */ + private saveToken(token: unknown): void { + const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json'); + const dir = dirname(tokenPath); + if (!existsSync(dir)) { + mkdirSync(dir, { recursive: true }); + } + writeFileSync(tokenPath, JSON.stringify(token, null, 2), 'utf-8'); + try { + chmodSync(tokenPath, 0o600); + } catch { + // chmod may fail on some filesystems — not critical + } + } +} diff --git a/src/automation/index.ts b/src/automation/index.ts index 2c1e3d0..4800bbf 100644 --- a/src/automation/index.ts +++ b/src/automation/index.ts @@ -1,4 +1,5 @@ export { CronScheduler } from './cron.js'; export { WebhookHandler } from './webhooks.js'; +export { GmailWatcher } from './gmail.js'; export { HeartbeatMonitor, parseInterval } from './heartbeat.js'; export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js'; diff --git a/src/config/schema.ts b/src/config/schema.ts index 791e4b2..92c4d49 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -119,6 +119,20 @@ const webhookSchema = z.object({ enabled: z.boolean().default(true), }); +const gmailSchema = z.object({ + enabled: z.boolean().default(false), + credentials_file: z.string().optional(), + token_file: z.string().default('~/.config/flynn/gmail-token.json'), + watch_labels: z.array(z.string()).default(['INBOX']), + poll_interval: z.string().default('60s'), + history_start: z.string().optional(), // ISO date string — only process emails after this date + output: z.object({ + channel: z.string().min(1), + peer: z.string().min(1), + }), + message: z.string().default('New email from {{from}}: {{subject}}\n\n{{snippet}}'), +}).optional(); + const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk']); const heartbeatSchema = z.object({ @@ -136,6 +150,7 @@ const heartbeatSchema = z.object({ const automationSchema = z.object({ cron: z.array(cronJobSchema).default([]), webhooks: z.array(webhookSchema).default([]), + gmail: gmailSchema, heartbeat: heartbeatSchema, }).default({}); @@ -343,6 +358,7 @@ export type TelegramConfig = z.infer; export type ModelConfig = z.infer; export type CronJobConfig = z.infer; export type WebhookConfig = z.infer; +export type GmailConfig = z.infer; export type AgentsConfig = z.infer; export type CompactionConfig = z.infer; export type MemoryConfig = z.infer; diff --git a/src/daemon/index.ts b/src/daemon/index.ts index db67519..97c5a77 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -17,7 +17,7 @@ import type { EmbeddingProvider as EmbeddingProviderInterface } from '../memory/ import { createMemoryTools } from '../tools/builtin/index.js'; import { GatewayServer } from '../gateway/index.js'; import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter } from '../channels/index.js'; -import { CronScheduler, WebhookHandler, HeartbeatMonitor } from '../automation/index.js'; +import { CronScheduler, WebhookHandler, HeartbeatMonitor, GmailWatcher } from '../automation/index.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { McpManager } from '../mcp/index.js'; import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js'; @@ -884,6 +884,15 @@ export async function startDaemon(config: Config): Promise { console.log(`Registered ${config.automation.webhooks.length} webhook(s)`); } + // Register Gmail watcher adapter (if configured and enabled) + let gmailWatcher: GmailWatcher | undefined; + if (config.automation.gmail?.enabled) { + gmailWatcher = new GmailWatcher(config.automation.gmail, channelRegistry); + channelRegistry.register(gmailWatcher); + gateway.setGmailHandler(gmailWatcher); + console.log('Registered Gmail watcher'); + } + // ── Register Tier 1 agent tools ───────────────────────────── // Session management tools (list, history, create, delete) diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 494d612..a3834be 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -25,6 +25,7 @@ import type { Config } from '../config/index.js'; import type { ToolRegistry } from '../tools/registry.js'; import type { ToolExecutor } from '../tools/executor.js'; import type { WebhookHandler } from '../automation/webhooks.js'; +import type { GmailWatcher } from '../automation/gmail.js'; export interface GatewayServerConfig { port: number; @@ -45,6 +46,8 @@ export interface GatewayServerConfig { channelRegistry?: { list(): Array<{ readonly name: string; readonly status: string }> }; /** Optional webhook handler for inbound webhook HTTP routes. */ webhookHandler?: WebhookHandler; + /** Optional Gmail handler for Pub/Sub push notifications. */ + gmailHandler?: GmailWatcher; } export class GatewayServer { @@ -223,6 +226,25 @@ export class GatewayServer { } } + // Gmail Pub/Sub push route — bypass gateway auth (Google sends push notifications directly) + if (this.config.gmailHandler && req.method === 'POST' && req.url?.startsWith('/gmail/push')) { + try { + const body = await this.readRequestBody(req); + const parsed = JSON.parse(body) as { message?: { data?: string } }; + const data = parsed?.message?.data; + if (data) { + await this.config.gmailHandler.handlePushNotification(data); + } + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ ok: true })); + } catch (err) { + console.error('Gmail push handler error:', err instanceof Error ? err.message : err); + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Invalid request' })); + } + return; + } + // Apply auth to HTTP requests when configured const authConfig = this.config.auth ?? {}; if (this.config.authHttp !== false && authConfig.token) { @@ -299,4 +321,19 @@ export class GatewayServer { setWebhookHandler(handler: WebhookHandler): void { this.config.webhookHandler = handler; } + + /** Set the Gmail handler for Pub/Sub push notifications (late binding). */ + setGmailHandler(handler: GmailWatcher): void { + this.config.gmailHandler = handler; + } + + /** Read the full request body as a string. */ + private readRequestBody(req: IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + req.on('data', (chunk: Buffer) => chunks.push(chunk)); + req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))); + req.on('error', reject); + }); + } }