feat: add Gmail Pub/Sub watcher for inbound email automation

New ChannelAdapter that monitors Gmail via Google Cloud Pub/Sub push
notifications with polling fallback. Supports OAuth2 auth, configurable
watch labels, template rendering with email metadata placeholders
(from, to, subject, snippet, date, id, labels).

Wired into daemon lifecycle and gateway (POST /gmail/push endpoint).
Includes 16 tests covering auth, templates, push notifications, and
channel routing.
This commit is contained in:
William Valentin
2026-02-07 15:39:24 -08:00
parent 131d23989c
commit 06438bb44f
8 changed files with 1008 additions and 1 deletions
+1
View File
@@ -50,6 +50,7 @@
"commander": "^14.0.3", "commander": "^14.0.3",
"croner": "^10.0.1", "croner": "^10.0.1",
"discord.js": "^14.25.1", "discord.js": "^14.25.1",
"googleapis": "^148.0.0",
"grammy": "^1.35.0", "grammy": "^1.35.0",
"ink": "^6.0.0", "ink": "^6.0.0",
"ink-text-input": "^6.0.0", "ink-text-input": "^6.0.0",
+121
View File
@@ -41,6 +41,9 @@ importers:
discord.js: discord.js:
specifier: ^14.25.1 specifier: ^14.25.1
version: 14.25.1 version: 14.25.1
googleapis:
specifier: ^148.0.0
version: 148.0.0
grammy: grammy:
specifier: ^1.35.0 specifier: ^1.35.0
version: 1.39.3 version: 1.39.3
@@ -1206,6 +1209,9 @@ packages:
resolution: {integrity: sha512-QxD8cf2eVqJOOz63z6JIN9BzvVs/dlySa5HGSBH5xtR8dPteIRQnBxxKqkNTiT6jbDTF6jAfrd4oMcND9RGbQg==} resolution: {integrity: sha512-QxD8cf2eVqJOOz63z6JIN9BzvVs/dlySa5HGSBH5xtR8dPteIRQnBxxKqkNTiT6jbDTF6jAfrd4oMcND9RGbQg==}
engines: {node: '>=0.6'} engines: {node: '>=0.6'}
bignumber.js@9.3.1:
resolution: {integrity: sha512-Ko0uX15oIUS7wJ3Rb30Fs6SkVbLmPBAKdlm7q9+ak9bbIeFf0MwuBsQV6z7+X768/cHsfg+WlysDWJcmthjsjQ==}
binary@0.3.0: binary@0.3.0:
resolution: {integrity: sha512-D4H1y5KYwpJgK8wk1Cue5LLPgmwHKYSChkbspQg5JtVuR5ulGckxfR62H3AE9UDkdMC8yyXlqYihuz3Aqg2XZg==} resolution: {integrity: sha512-D4H1y5KYwpJgK8wk1Cue5LLPgmwHKYSChkbspQg5JtVuR5ulGckxfR62H3AE9UDkdMC8yyXlqYihuz3Aqg2XZg==}
@@ -1677,6 +1683,9 @@ packages:
resolution: {integrity: sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==} resolution: {integrity: sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==}
engines: {node: '>= 18'} engines: {node: '>= 18'}
extend@3.0.2:
resolution: {integrity: sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==}
extract-zip@2.0.1: extract-zip@2.0.1:
resolution: {integrity: sha512-GDhU9ntwuKyGXdZBUgTIe+vXnWj0fppUEtMDL0+idd5Sta8TGpHssn/eusA9mrPr9qNDym6SxAYZjNvCn/9RBg==} resolution: {integrity: sha512-GDhU9ntwuKyGXdZBUgTIe+vXnWj0fppUEtMDL0+idd5Sta8TGpHssn/eusA9mrPr9qNDym6SxAYZjNvCn/9RBg==}
engines: {node: '>= 10.17.0'} engines: {node: '>= 10.17.0'}
@@ -1791,6 +1800,14 @@ packages:
function-bind@1.1.2: function-bind@1.1.2:
resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==} resolution: {integrity: sha512-7XHNxH7qX9xG5mIwxkhumTox/MIRNcOgDrxWsMt2pAr23WHp6MrRlN7FBSFpCpr+oVO0F744iUgR82nJMfG2SA==}
gaxios@6.7.1:
resolution: {integrity: sha512-LDODD4TMYx7XXdpwxAVRAIAuB0bzv0s+ywFonY46k126qzQHT9ygyoa9tncmOiQmmDrik65UYsEkv3lbfqQ3yQ==}
engines: {node: '>=14'}
gcp-metadata@6.1.1:
resolution: {integrity: sha512-a4tiq7E0/5fTjxPAaH4jpjkSv/uCaU2p5KC6HVGrvl0cDjA8iBZv4vv1gyzlmK0ZUKqwpOyQMKzZQe3lTit77A==}
engines: {node: '>=14'}
get-caller-file@2.0.5: get-caller-file@2.0.5:
resolution: {integrity: sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg==} resolution: {integrity: sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg==}
engines: {node: 6.* || 8.* || >= 10.*} engines: {node: 6.* || 8.* || >= 10.*}
@@ -1833,6 +1850,22 @@ packages:
resolution: {integrity: sha512-oahGvuMGQlPw/ivIYBjVSrWAfWLBeku5tpPE2fOPLi+WHffIWbuh2tCjhyQhTBPMf5E9jDEH4FOmTYgYwbKwtQ==} resolution: {integrity: sha512-oahGvuMGQlPw/ivIYBjVSrWAfWLBeku5tpPE2fOPLi+WHffIWbuh2tCjhyQhTBPMf5E9jDEH4FOmTYgYwbKwtQ==}
engines: {node: '>=18'} engines: {node: '>=18'}
google-auth-library@9.15.1:
resolution: {integrity: sha512-Jb6Z0+nvECVz+2lzSMt9u98UsoakXxA2HGHMCxh+so3n90XgYWkq5dur19JAJV7ONiJY22yBTyJB1TSkvPq9Ng==}
engines: {node: '>=14'}
google-logging-utils@0.0.2:
resolution: {integrity: sha512-NEgUnEcBiP5HrPzufUkBzJOD/Sxsco3rLNo1F1TNf7ieU8ryUzBhqba8r756CjLX7rn3fHl6iLEwPYuqpoKgQQ==}
engines: {node: '>=14'}
googleapis-common@7.2.0:
resolution: {integrity: sha512-/fhDZEJZvOV3X5jmD+fKxMqma5q2Q9nZNSF3kn1F18tpxmA86BcTxAGBQdM0N89Z3bEaIs+HVznSmFJEAmMTjA==}
engines: {node: '>=14.0.0'}
googleapis@148.0.0:
resolution: {integrity: sha512-8PDG5VItm6E1TdZWDqtRrUJSlBcNwz0/MwCa6AL81y/RxPGXJRUwKqGZfCoVX1ZBbfr3I4NkDxBmeTyOAZSWqw==}
engines: {node: '>=14.0.0'}
gopd@1.2.0: gopd@1.2.0:
resolution: {integrity: sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==} resolution: {integrity: sha512-ZUKRh6/kUFoAiTAtTYPZJ3hw9wNxx+BIBOijnlG9PnrJsCcSjs1wyyD6vJpaYtgnzDrKYRSqf3OO6Rfa93xsRg==}
engines: {node: '>= 0.4'} engines: {node: '>= 0.4'}
@@ -1844,6 +1877,10 @@ packages:
resolution: {integrity: sha512-7arRRoOtOh9UwMwANZ475kJrWV6P3/EGNooeHlY0/SwZv4t3ZZ3Uiz9cAXK8Zg9xSdgmm8T21kx6n7SZaWvOcw==} resolution: {integrity: sha512-7arRRoOtOh9UwMwANZ475kJrWV6P3/EGNooeHlY0/SwZv4t3ZZ3Uiz9cAXK8Zg9xSdgmm8T21kx6n7SZaWvOcw==}
engines: {node: ^12.20.0 || >=14.13.1} engines: {node: ^12.20.0 || >=14.13.1}
gtoken@7.1.0:
resolution: {integrity: sha512-pCcEwRi+TKpMlxAQObHDQ56KawURgyAf6jtIY046fJ5tIv3zDe/LEIubckAO8fj6JnAxLdmWkUfNyulQ2iKdEw==}
engines: {node: '>=14.0.0'}
has-flag@4.0.0: has-flag@4.0.0:
resolution: {integrity: sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==} resolution: {integrity: sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==}
engines: {node: '>=8'} engines: {node: '>=8'}
@@ -2002,6 +2039,9 @@ packages:
resolution: {integrity: sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA==} resolution: {integrity: sha512-qQKT4zQxXl8lLwBtHMWwaTcGfFOZviOJet3Oy/xmGk2gZH677CJM9EvtfdSkgWcATZhj/55JZ0rmy3myCT5lsA==}
hasBin: true hasBin: true
json-bigint@1.0.0:
resolution: {integrity: sha512-SiPv/8VpZuWbvLSMtTDU8hEfrZWg/mH/nV/b4o0CYbSxu1UIQPLdwKOCIyLQX+VIPO5vrLX3i8qtqFyhdPSUSQ==}
json-buffer@3.0.1: json-buffer@3.0.1:
resolution: {integrity: sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ==} resolution: {integrity: sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ==}
@@ -2790,9 +2830,16 @@ packages:
uri-js@4.4.1: uri-js@4.4.1:
resolution: {integrity: sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==} resolution: {integrity: sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==}
url-template@2.0.8:
resolution: {integrity: sha512-XdVKMF4SJ0nP/O7XIPB0JwAEuT9lDIYnNsK8yGVe43y0AWoKeJNdv3ZNWh7ksJ6KqQFjOO6ox/VEitLnaVNufw==}
util-deprecate@1.0.2: util-deprecate@1.0.2:
resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==} resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==}
uuid@9.0.1:
resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==}
hasBin: true
vary@1.1.2: vary@1.1.2:
resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==}
engines: {node: '>= 0.8'} engines: {node: '>= 0.8'}
@@ -4437,6 +4484,8 @@ snapshots:
big-integer@1.6.52: big-integer@1.6.52:
optional: true optional: true
bignumber.js@9.3.1: {}
binary@0.3.0: binary@0.3.0:
dependencies: dependencies:
buffers: 0.1.1 buffers: 0.1.1
@@ -4977,6 +5026,8 @@ snapshots:
transitivePeerDependencies: transitivePeerDependencies:
- supports-color - supports-color
extend@3.0.2: {}
extract-zip@2.0.1: extract-zip@2.0.1:
dependencies: dependencies:
debug: 4.4.3 debug: 4.4.3
@@ -5089,6 +5140,26 @@ snapshots:
function-bind@1.1.2: {} function-bind@1.1.2: {}
gaxios@6.7.1:
dependencies:
extend: 3.0.2
https-proxy-agent: 7.0.6
is-stream: 2.0.1
node-fetch: 2.7.0
uuid: 9.0.1
transitivePeerDependencies:
- encoding
- supports-color
gcp-metadata@6.1.1:
dependencies:
gaxios: 6.7.1
google-logging-utils: 0.0.2
json-bigint: 1.0.0
transitivePeerDependencies:
- encoding
- supports-color
get-caller-file@2.0.5: {} get-caller-file@2.0.5: {}
get-east-asian-width@1.4.0: {} get-east-asian-width@1.4.0: {}
@@ -5145,6 +5216,40 @@ snapshots:
globals@14.0.0: {} globals@14.0.0: {}
google-auth-library@9.15.1:
dependencies:
base64-js: 1.5.1
ecdsa-sig-formatter: 1.0.11
gaxios: 6.7.1
gcp-metadata: 6.1.1
gtoken: 7.1.0
jws: 4.0.1
transitivePeerDependencies:
- encoding
- supports-color
google-logging-utils@0.0.2: {}
googleapis-common@7.2.0:
dependencies:
extend: 3.0.2
gaxios: 6.7.1
google-auth-library: 9.15.1
qs: 6.14.1
url-template: 2.0.8
uuid: 9.0.1
transitivePeerDependencies:
- encoding
- supports-color
googleapis@148.0.0:
dependencies:
google-auth-library: 9.15.1
googleapis-common: 7.2.0
transitivePeerDependencies:
- encoding
- supports-color
gopd@1.2.0: {} gopd@1.2.0: {}
graceful-fs@4.2.11: graceful-fs@4.2.11:
@@ -5160,6 +5265,14 @@ snapshots:
- encoding - encoding
- supports-color - supports-color
gtoken@7.1.0:
dependencies:
gaxios: 6.7.1
jws: 4.0.1
transitivePeerDependencies:
- encoding
- supports-color
has-flag@4.0.0: {} has-flag@4.0.0: {}
has-symbols@1.1.0: {} has-symbols@1.1.0: {}
@@ -5318,6 +5431,10 @@ snapshots:
dependencies: dependencies:
argparse: 2.0.1 argparse: 2.0.1
json-bigint@1.0.0:
dependencies:
bignumber.js: 9.3.1
json-buffer@3.0.1: {} json-buffer@3.0.1: {}
json-parse-even-better-errors@2.3.1: {} json-parse-even-better-errors@2.3.1: {}
@@ -6196,8 +6313,12 @@ snapshots:
dependencies: dependencies:
punycode: 2.3.1 punycode: 2.3.1
url-template@2.0.8: {}
util-deprecate@1.0.2: {} util-deprecate@1.0.2: {}
uuid@9.0.1: {}
vary@1.1.2: {} vary@1.1.2: {}
vite-node@3.2.4(@types/node@22.19.7)(tsx@4.21.0)(yaml@2.8.2): vite-node@3.2.4(@types/node@22.19.7)(tsx@4.21.0)(yaml@2.8.2):
+400
View File
@@ -0,0 +1,400 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { homedir } from 'os';
import { GmailWatcher } from './gmail.js';
import type { OutboundMessage } from '../channels/types.js';
// Mock googleapis module
vi.mock('googleapis', () => {
const mockWatch = vi.fn().mockResolvedValue({
data: { historyId: '12345', expiration: String(Date.now() + 7 * 24 * 60 * 60 * 1000) },
});
const mockHistoryList = vi.fn().mockResolvedValue({
data: {
history: [
{
messagesAdded: [
{ message: { id: 'msg-001' } },
{ message: { id: 'msg-002' } },
],
},
],
historyId: '12346',
},
});
const mockMessagesGet = vi.fn().mockImplementation(({ id }: { id: string }) => {
return Promise.resolve({
data: {
id,
snippet: 'Hello, this is a test email',
labelIds: ['INBOX', 'UNREAD'],
payload: {
headers: [
{ name: 'From', value: 'alice@example.com' },
{ name: 'To', value: 'bob@example.com' },
{ name: 'Subject', value: 'Test Subject' },
{ name: 'Date', value: 'Sat, 07 Feb 2026 10:00:00 +0000' },
],
},
},
});
});
const mockGetProfile = vi.fn().mockResolvedValue({
data: { emailAddress: 'bob@example.com', historyId: '12344' },
});
const mockOAuth2 = vi.fn().mockImplementation(() => ({
setCredentials: vi.fn(),
on: vi.fn(),
}));
return {
google: {
auth: {
OAuth2: mockOAuth2,
},
gmail: vi.fn().mockReturnValue({
users: {
watch: mockWatch,
getProfile: mockGetProfile,
history: { list: mockHistoryList },
messages: { get: mockMessagesGet },
},
}),
},
_mocks: {
mockWatch,
mockHistoryList,
mockMessagesGet,
mockGetProfile,
mockOAuth2,
},
};
});
// Mock fs operations
vi.mock('fs', async () => {
const actual = await vi.importActual<typeof import('fs')>('fs');
return {
...actual,
existsSync: vi.fn().mockReturnValue(true),
readFileSync: vi.fn().mockImplementation((path: string) => {
if (path.includes('credentials')) {
return JSON.stringify({
installed: {
client_id: 'test-client-id',
client_secret: 'test-client-secret',
redirect_uris: ['http://localhost'],
},
});
}
// Token file
return JSON.stringify({
access_token: 'test-access-token',
refresh_token: 'test-refresh-token',
expiry_date: Date.now() + 3600000,
});
}),
writeFileSync: vi.fn(),
mkdirSync: vi.fn(),
chmodSync: vi.fn(),
};
});
function createMockConfig(overrides = {}) {
return {
enabled: true,
credentials_file: '~/.config/flynn/gmail-credentials.json',
token_file: '~/.config/flynn/gmail-token.json',
watch_labels: ['INBOX'],
poll_interval: '60s',
output: {
channel: 'telegram',
peer: '12345',
},
message: 'New email from {{from}}: {{subject}}\n\n{{snippet}}',
...overrides,
};
}
function createMockChannelLookup() {
const mockSend = vi.fn().mockResolvedValue(undefined);
return {
get: vi.fn().mockReturnValue({ send: mockSend }),
_mockSend: mockSend,
};
}
describe('GmailWatcher', () => {
let watcher: GmailWatcher;
let channelLookup: ReturnType<typeof createMockChannelLookup>;
beforeEach(() => {
vi.useFakeTimers();
channelLookup = createMockChannelLookup();
});
afterEach(async () => {
if (watcher) {
await watcher.disconnect();
}
vi.useRealTimers();
vi.restoreAllMocks();
});
describe('construction', () => {
it('creates with valid config', () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
expect(watcher.name).toBe('gmail');
expect(watcher.status).toBe('disconnected');
});
});
describe('connect() with missing credentials', () => {
it('logs warning and sets status to error when credentials_file is missing', async () => {
const config = createMockConfig({ credentials_file: undefined });
watcher = new GmailWatcher(config, channelLookup);
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
await watcher.connect();
expect(watcher.status).toBe('error');
expect(errorSpy).toHaveBeenCalledWith(
expect.stringContaining('GmailWatcher: Authorization failed'),
);
// The actual error includes instructions
const calls = errorSpy.mock.calls.flat().join(' ');
expect(calls).toContain('flynn gmail-auth');
errorSpy.mockRestore();
});
it('logs warning when token file does not exist', async () => {
const { existsSync } = await import('fs');
const mockExistsSync = vi.mocked(existsSync);
// credentials file exists but token file does not
mockExistsSync.mockImplementation((path: unknown) => {
const p = String(path);
if (p.includes('credentials')) return true;
if (p.includes('token')) return false;
return true;
});
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
await watcher.connect();
expect(watcher.status).toBe('error');
const calls = errorSpy.mock.calls.flat().join(' ');
expect(calls).toContain('flynn gmail-auth');
errorSpy.mockRestore();
});
});
describe('renderTemplate', () => {
it('replaces all placeholders correctly', () => {
const config = createMockConfig({
message: 'From: {{from}} To: {{to}} Subject: {{subject}} Snippet: {{snippet}} Date: {{date}} ID: {{id}} Labels: {{labels}}',
});
watcher = new GmailWatcher(config, channelLookup);
const email = {
id: 'msg-123',
from: 'alice@example.com',
to: 'bob@example.com',
subject: 'Hello!',
snippet: 'How are you?',
date: '2026-02-07T10:00:00Z',
labels: ['INBOX', 'UNREAD'],
};
const result = watcher.renderTemplate(email);
expect(result).toBe(
'From: alice@example.com To: bob@example.com Subject: Hello! Snippet: How are you? Date: 2026-02-07T10:00:00Z ID: msg-123 Labels: INBOX, UNREAD',
);
});
it('handles missing fields gracefully', () => {
const config = createMockConfig({
message: '{{from}}: {{subject}}',
});
watcher = new GmailWatcher(config, channelLookup);
const email = {
id: '',
from: '',
to: '',
subject: '',
snippet: '',
date: '',
labels: [],
};
const result = watcher.renderTemplate(email);
expect(result).toBe(': ');
});
it('replaces multiple occurrences of the same placeholder', () => {
const config = createMockConfig({
message: '{{from}} sent: {{subject}} (from {{from}})',
});
watcher = new GmailWatcher(config, channelLookup);
const email = {
id: 'msg-1',
from: 'alice@example.com',
to: 'bob@example.com',
subject: 'Hi',
snippet: '',
date: '',
labels: [],
};
const result = watcher.renderTemplate(email);
expect(result).toBe('alice@example.com sent: Hi (from alice@example.com)');
});
});
describe('expandPath', () => {
it('expands ~ to homedir', () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const result = watcher.expandPath('~/some/path');
expect(result).toBe(`${homedir()}/some/path`);
});
it('expands bare ~', () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const result = watcher.expandPath('~');
expect(result).toBe(homedir());
});
it('resolves absolute paths unchanged', () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const result = watcher.expandPath('/tmp/test.json');
expect(result).toBe('/tmp/test.json');
});
});
describe('handlePushNotification', () => {
it('decodes base64 data and updates historyId', async () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
// Set initial historyId via direct access
(watcher as unknown as { lastHistoryId: string }).lastHistoryId = '100';
const notification = { emailAddress: 'bob@example.com', historyId: '200' };
const encoded = Buffer.from(JSON.stringify(notification)).toString('base64');
await watcher.handlePushNotification(encoded);
// historyId should be updated
expect((watcher as unknown as { lastHistoryId: string }).lastHistoryId).toBe('200');
});
it('ignores stale historyId', async () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
(watcher as unknown as { lastHistoryId: string }).lastHistoryId = '200';
const notification = { emailAddress: 'bob@example.com', historyId: '100' };
const encoded = Buffer.from(JSON.stringify(notification)).toString('base64');
await watcher.handlePushNotification(encoded);
// historyId should NOT change
expect((watcher as unknown as { lastHistoryId: string }).lastHistoryId).toBe('200');
});
it('handles invalid base64 gracefully', async () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
await watcher.handlePushNotification('not-valid-json-after-decode!!!');
// Should not throw — just log error
expect(errorSpy).toHaveBeenCalled();
errorSpy.mockRestore();
});
});
describe('disconnect', () => {
it('clears timers and sets status to disconnected', async () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
// Manually set status and timers
(watcher as unknown as { _status: string })._status = 'connected';
(watcher as unknown as { pollTimer: ReturnType<typeof setInterval> }).pollTimer = setInterval(() => {}, 60000);
(watcher as unknown as { watchTimer: ReturnType<typeof setInterval> }).watchTimer = setInterval(() => {}, 60000);
await watcher.disconnect();
expect(watcher.status).toBe('disconnected');
expect((watcher as unknown as { pollTimer: unknown }).pollTimer).toBeUndefined();
expect((watcher as unknown as { watchTimer: unknown }).watchTimer).toBeUndefined();
});
});
describe('send', () => {
it('routes to output channel via ChannelLookup', async () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const message: OutboundMessage = { text: 'Reply text' };
await watcher.send('some-peer', message);
expect(channelLookup.get).toHaveBeenCalledWith('telegram');
expect(channelLookup._mockSend).toHaveBeenCalledWith('12345', message);
});
it('warns when output channel not found', async () => {
const config = createMockConfig();
const emptyLookup = { get: vi.fn().mockReturnValue(undefined) };
watcher = new GmailWatcher(config, emptyLookup);
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
await watcher.send('some-peer', { text: 'test' });
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Output channel 'telegram' not found"),
);
warnSpy.mockRestore();
});
});
describe('onMessage', () => {
it('registers message handler', () => {
const config = createMockConfig();
watcher = new GmailWatcher(config, channelLookup);
const handler = vi.fn();
watcher.onMessage(handler);
// Verify handler is stored (indirectly, via processHistoryChanges)
expect((watcher as unknown as { messageHandler: unknown }).messageHandler).toBe(handler);
});
});
});
+422
View File
@@ -0,0 +1,422 @@
import { google, type Auth } from 'googleapis';
import { readFileSync, writeFileSync, existsSync, mkdirSync, chmodSync } from 'fs';
import { dirname, resolve } from 'path';
import { homedir } from 'os';
import type { GmailConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import { parseInterval } from './heartbeat.js';
/** Minimal interface for the parts of ChannelRegistry we need. */
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
/** Parsed email information used for template rendering. */
interface EmailInfo {
id: string;
from: string;
to: string;
subject: string;
snippet: string;
date: string;
labels: string[];
}
/** Pub/Sub push notification payload from Google. */
interface PubSubNotification {
emailAddress: string;
historyId: string;
}
// Google Cloud Pub/Sub topic for Gmail push notifications.
// This must be pre-configured in Google Cloud Console.
const GMAIL_PUBSUB_TOPIC = 'projects/flynn-agent/topics/gmail-push';
// Watch expires after ~7 days; renew at 6 days (in ms).
const WATCH_RENEWAL_MS = 6 * 24 * 60 * 60 * 1000;
/**
* GmailWatcher monitors a Gmail inbox for new messages and forwards them
* as InboundMessages via the channel adapter pattern.
*
* Supports two modes:
* - **Pub/Sub push**: Google sends push notifications when new emails arrive.
* Requires a POST /gmail/push route on the gateway.
* - **Polling fallback**: Periodically polls Gmail History API for changes.
*
* Authentication uses OAuth2 with a stored refresh token.
*/
export class GmailWatcher implements ChannelAdapter {
readonly name = 'gmail';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private oauth2Client?: Auth.OAuth2Client;
private lastHistoryId?: string;
private pollTimer?: ReturnType<typeof setInterval>;
private watchTimer?: ReturnType<typeof setInterval>;
private readonly config: NonNullable<GmailConfig>;
constructor(
config: NonNullable<GmailConfig>,
private readonly channelLookup: ChannelLookup,
) {
this.config = config;
}
get status(): ChannelStatus {
return this._status;
}
async connect(): Promise<void> {
this._status = 'connecting';
try {
this.oauth2Client = await this.authorize();
} catch (error) {
const errMsg = error instanceof Error ? error.message : 'Unknown error';
console.error(`GmailWatcher: Authorization failed — ${errMsg}`);
console.error('GmailWatcher: Run "flynn gmail-auth" to set up Gmail credentials.');
this._status = 'error';
return;
}
// Set up Gmail push watch (Pub/Sub)
try {
await this.setupWatch();
} catch (error) {
const errMsg = error instanceof Error ? error.message : 'Unknown error';
console.warn(`GmailWatcher: Watch setup failed (will use polling only) — ${errMsg}`);
}
// Start polling fallback
const pollMs = parseInterval(this.config.poll_interval ?? '60s');
this.pollTimer = setInterval(() => {
this.pollForNewMessages().catch((err) => {
console.error('GmailWatcher: Poll error —', err instanceof Error ? err.message : err);
});
}, pollMs);
this._status = 'connected';
console.log(`GmailWatcher: Connected (poll_interval=${this.config.poll_interval ?? '60s'})`);
}
async disconnect(): Promise<void> {
if (this.pollTimer) {
clearInterval(this.pollTimer);
this.pollTimer = undefined;
}
if (this.watchTimer) {
clearInterval(this.watchTimer);
this.watchTimer = undefined;
}
this.oauth2Client = undefined;
this._status = 'disconnected';
}
async send(peerId: string, message: OutboundMessage): Promise<void> {
// Route responses to the configured output channel
const outputAdapter = this.channelLookup.get(this.config.output.channel);
if (!outputAdapter) {
console.warn(`GmailWatcher: Output channel '${this.config.output.channel}' not found`);
return;
}
await outputAdapter.send(this.config.output.peer, message);
}
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/**
* Handle a Pub/Sub push notification from Google.
* Called by the gateway when POST /gmail/push is received.
* @param data Base64-encoded Pub/Sub message data
*/
async handlePushNotification(data: string): Promise<void> {
try {
const decoded = Buffer.from(data, 'base64').toString('utf-8');
const notification = JSON.parse(decoded) as PubSubNotification;
if (!notification.historyId) {
console.warn('GmailWatcher: Push notification missing historyId');
return;
}
// Only process if the new historyId is greater than our last known one
if (this.lastHistoryId && BigInt(notification.historyId) <= BigInt(this.lastHistoryId)) {
return;
}
if (this.lastHistoryId) {
await this.processHistoryChanges(this.lastHistoryId);
}
this.lastHistoryId = notification.historyId;
} catch (error) {
console.error('GmailWatcher: Push notification error —', error instanceof Error ? error.message : error);
}
}
/**
* Authorize with Gmail using stored OAuth2 credentials.
* Reads client credentials and stored token from config paths.
*/
private async authorize(): Promise<Auth.OAuth2Client> {
const credentialsPath = this.config.credentials_file;
if (!credentialsPath) {
throw new Error('No credentials_file configured. Set automation.gmail.credentials_file in config.');
}
const expandedCredsPath = this.expandPath(credentialsPath);
if (!existsSync(expandedCredsPath)) {
throw new Error(`Credentials file not found: ${expandedCredsPath}`);
}
const credentials = JSON.parse(readFileSync(expandedCredsPath, 'utf-8'));
const { client_id, client_secret, redirect_uris } = credentials.installed ?? credentials.web ?? {};
if (!client_id || !client_secret) {
throw new Error('Invalid credentials file — missing client_id or client_secret');
}
const oauth2Client = new google.auth.OAuth2(
client_id,
client_secret,
redirect_uris?.[0] ?? 'http://localhost',
);
// Load stored token
const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json');
if (!existsSync(tokenPath)) {
throw new Error(
`Token file not found: ${tokenPath}. Run "flynn gmail-auth" to authenticate.`,
);
}
const token = JSON.parse(readFileSync(tokenPath, 'utf-8'));
oauth2Client.setCredentials(token);
// Auto-save refreshed tokens
oauth2Client.on('tokens', (newTokens) => {
const merged = { ...token, ...newTokens };
this.saveToken(merged);
});
return oauth2Client;
}
/**
* Set up Gmail Pub/Sub watch for push notifications.
* Calls gmail.users.watch() and schedules renewal before expiry.
*/
private async setupWatch(): Promise<void> {
if (!this.oauth2Client) return;
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
const watchResponse = await gmail.users.watch({
userId: 'me',
requestBody: {
labelIds: this.config.watch_labels ?? ['INBOX'],
topicName: GMAIL_PUBSUB_TOPIC,
},
});
if (watchResponse.data.historyId) {
this.lastHistoryId = watchResponse.data.historyId.toString();
}
console.log(`GmailWatcher: Watch registered (historyId=${this.lastHistoryId})`);
// Schedule renewal before watch expiry (~7 days)
this.watchTimer = setInterval(() => {
this.setupWatch().catch((err) => {
console.error('GmailWatcher: Watch renewal failed —', err instanceof Error ? err.message : err);
});
}, WATCH_RENEWAL_MS);
}
/**
* Poll Gmail History API for new messages since lastHistoryId.
* Fallback mechanism when Pub/Sub push is not available.
*/
private async pollForNewMessages(): Promise<void> {
if (!this.oauth2Client) return;
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
// If no historyId yet, initialize it from the profile
if (!this.lastHistoryId) {
try {
const profile = await gmail.users.getProfile({ userId: 'me' });
if (profile.data.historyId) {
this.lastHistoryId = profile.data.historyId.toString();
}
} catch (error) {
console.error('GmailWatcher: Failed to get profile —', error instanceof Error ? error.message : error);
}
return; // First poll — just establish the baseline
}
await this.processHistoryChanges(this.lastHistoryId);
}
/**
* Fetch history changes since the given historyId and process new messages.
* Updates lastHistoryId to the latest value from the response.
*/
private async processHistoryChanges(startHistoryId: string): Promise<void> {
if (!this.oauth2Client) return;
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
try {
const historyResponse = await gmail.users.history.list({
userId: 'me',
startHistoryId: startHistoryId,
labelId: (this.config.watch_labels ?? ['INBOX'])[0],
historyTypes: ['messageAdded'],
});
const history = historyResponse.data.history ?? [];
const processedIds = new Set<string>();
for (const record of history) {
const addedMessages = record.messagesAdded ?? [];
for (const added of addedMessages) {
const messageId = added.message?.id;
if (!messageId || processedIds.has(messageId)) continue;
processedIds.add(messageId);
const email = await this.getMessageDetails(messageId);
if (!email) continue;
// Skip messages before history_start if configured
if (this.config.history_start) {
const emailDate = new Date(email.date);
const startDate = new Date(this.config.history_start);
if (emailDate < startDate) continue;
}
const text = this.renderTemplate(email);
const msg: InboundMessage = {
id: `gmail-${email.id}-${Date.now()}`,
channel: 'gmail',
senderId: email.from,
senderName: `gmail:${email.from}`,
text,
timestamp: Date.now(),
metadata: {
emailId: email.id,
from: email.from,
to: email.to,
subject: email.subject,
labels: email.labels,
},
};
this.messageHandler?.(msg);
}
}
// Update historyId to the latest
if (historyResponse.data.historyId) {
this.lastHistoryId = historyResponse.data.historyId.toString();
}
} catch (error: unknown) {
// 404 means historyId is too old — reset by fetching profile
if (error instanceof Error && 'code' in error && (error as { code: number }).code === 404) {
console.warn('GmailWatcher: History expired, re-syncing...');
try {
const profile = await gmail.users.getProfile({ userId: 'me' });
if (profile.data.historyId) {
this.lastHistoryId = profile.data.historyId.toString();
}
} catch (profileError) {
console.error('GmailWatcher: Failed to re-sync profile —', profileError instanceof Error ? profileError.message : profileError);
}
} else {
throw error;
}
}
}
/**
* Fetch full message details by ID and extract relevant headers.
*/
private async getMessageDetails(messageId: string): Promise<EmailInfo | null> {
if (!this.oauth2Client) return null;
const gmail = google.gmail({ version: 'v1', auth: this.oauth2Client });
try {
const msg = await gmail.users.messages.get({
userId: 'me',
id: messageId,
format: 'metadata',
metadataHeaders: ['From', 'To', 'Subject', 'Date'],
});
const headers = msg.data.payload?.headers ?? [];
const getHeader = (name: string): string =>
headers.find(h => h.name?.toLowerCase() === name.toLowerCase())?.value ?? '';
return {
id: messageId,
from: getHeader('From'),
to: getHeader('To'),
subject: getHeader('Subject'),
snippet: msg.data.snippet ?? '',
date: getHeader('Date'),
labels: msg.data.labelIds ?? [],
};
} catch (error) {
console.error(`GmailWatcher: Failed to fetch message ${messageId}`, error instanceof Error ? error.message : error);
return null;
}
}
/**
* Render the message template with email placeholders.
* Supported: {{from}}, {{to}}, {{subject}}, {{snippet}}, {{date}}, {{id}}, {{labels}}
*/
renderTemplate(email: EmailInfo): string {
return this.config.message
.replace(/\{\{from\}\}/g, email.from)
.replace(/\{\{to\}\}/g, email.to)
.replace(/\{\{subject\}\}/g, email.subject)
.replace(/\{\{snippet\}\}/g, email.snippet)
.replace(/\{\{date\}\}/g, email.date)
.replace(/\{\{id\}\}/g, email.id)
.replace(/\{\{labels\}\}/g, email.labels.join(', '));
}
/**
* Expand ~ to the user's home directory.
*/
expandPath(p: string): string {
if (p.startsWith('~/') || p === '~') {
return resolve(homedir(), p.slice(2));
}
return resolve(p);
}
/**
* Save token to disk with restrictive permissions (0o600).
*/
private saveToken(token: unknown): void {
const tokenPath = this.expandPath(this.config.token_file ?? '~/.config/flynn/gmail-token.json');
const dir = dirname(tokenPath);
if (!existsSync(dir)) {
mkdirSync(dir, { recursive: true });
}
writeFileSync(tokenPath, JSON.stringify(token, null, 2), 'utf-8');
try {
chmodSync(tokenPath, 0o600);
} catch {
// chmod may fail on some filesystems — not critical
}
}
}
+1
View File
@@ -1,4 +1,5 @@
export { CronScheduler } from './cron.js'; export { CronScheduler } from './cron.js';
export { WebhookHandler } from './webhooks.js'; export { WebhookHandler } from './webhooks.js';
export { GmailWatcher } from './gmail.js';
export { HeartbeatMonitor, parseInterval } from './heartbeat.js'; export { HeartbeatMonitor, parseInterval } from './heartbeat.js';
export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js'; export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js';
+16
View File
@@ -119,6 +119,20 @@ const webhookSchema = z.object({
enabled: z.boolean().default(true), enabled: z.boolean().default(true),
}); });
const gmailSchema = z.object({
enabled: z.boolean().default(false),
credentials_file: z.string().optional(),
token_file: z.string().default('~/.config/flynn/gmail-token.json'),
watch_labels: z.array(z.string()).default(['INBOX']),
poll_interval: z.string().default('60s'),
history_start: z.string().optional(), // ISO date string — only process emails after this date
output: z.object({
channel: z.string().min(1),
peer: z.string().min(1),
}),
message: z.string().default('New email from {{from}}: {{subject}}\n\n{{snippet}}'),
}).optional();
const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk']); const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk']);
const heartbeatSchema = z.object({ const heartbeatSchema = z.object({
@@ -136,6 +150,7 @@ const heartbeatSchema = z.object({
const automationSchema = z.object({ const automationSchema = z.object({
cron: z.array(cronJobSchema).default([]), cron: z.array(cronJobSchema).default([]),
webhooks: z.array(webhookSchema).default([]), webhooks: z.array(webhookSchema).default([]),
gmail: gmailSchema,
heartbeat: heartbeatSchema, heartbeat: heartbeatSchema,
}).default({}); }).default({});
@@ -343,6 +358,7 @@ export type TelegramConfig = z.infer<typeof telegramSchema>;
export type ModelConfig = z.infer<typeof modelConfigSchema>; export type ModelConfig = z.infer<typeof modelConfigSchema>;
export type CronJobConfig = z.infer<typeof cronJobSchema>; export type CronJobConfig = z.infer<typeof cronJobSchema>;
export type WebhookConfig = z.infer<typeof webhookSchema>; export type WebhookConfig = z.infer<typeof webhookSchema>;
export type GmailConfig = z.infer<typeof gmailSchema>;
export type AgentsConfig = z.infer<typeof agentsSchema>; export type AgentsConfig = z.infer<typeof agentsSchema>;
export type CompactionConfig = z.infer<typeof compactionSchema>; export type CompactionConfig = z.infer<typeof compactionSchema>;
export type MemoryConfig = z.infer<typeof memorySchema>; export type MemoryConfig = z.infer<typeof memorySchema>;
+10 -1
View File
@@ -17,7 +17,7 @@ import type { EmbeddingProvider as EmbeddingProviderInterface } from '../memory/
import { createMemoryTools } from '../tools/builtin/index.js'; import { createMemoryTools } from '../tools/builtin/index.js';
import { GatewayServer } from '../gateway/index.js'; import { GatewayServer } from '../gateway/index.js';
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter } from '../channels/index.js'; import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter } from '../channels/index.js';
import { CronScheduler, WebhookHandler, HeartbeatMonitor } from '../automation/index.js'; import { CronScheduler, WebhookHandler, HeartbeatMonitor, GmailWatcher } from '../automation/index.js';
import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js';
import { McpManager } from '../mcp/index.js'; import { McpManager } from '../mcp/index.js';
import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js'; import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js';
@@ -884,6 +884,15 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
console.log(`Registered ${config.automation.webhooks.length} webhook(s)`); console.log(`Registered ${config.automation.webhooks.length} webhook(s)`);
} }
// Register Gmail watcher adapter (if configured and enabled)
let gmailWatcher: GmailWatcher | undefined;
if (config.automation.gmail?.enabled) {
gmailWatcher = new GmailWatcher(config.automation.gmail, channelRegistry);
channelRegistry.register(gmailWatcher);
gateway.setGmailHandler(gmailWatcher);
console.log('Registered Gmail watcher');
}
// ── Register Tier 1 agent tools ───────────────────────────── // ── Register Tier 1 agent tools ─────────────────────────────
// Session management tools (list, history, create, delete) // Session management tools (list, history, create, delete)
+37
View File
@@ -25,6 +25,7 @@ import type { Config } from '../config/index.js';
import type { ToolRegistry } from '../tools/registry.js'; import type { ToolRegistry } from '../tools/registry.js';
import type { ToolExecutor } from '../tools/executor.js'; import type { ToolExecutor } from '../tools/executor.js';
import type { WebhookHandler } from '../automation/webhooks.js'; import type { WebhookHandler } from '../automation/webhooks.js';
import type { GmailWatcher } from '../automation/gmail.js';
export interface GatewayServerConfig { export interface GatewayServerConfig {
port: number; port: number;
@@ -45,6 +46,8 @@ export interface GatewayServerConfig {
channelRegistry?: { list(): Array<{ readonly name: string; readonly status: string }> }; channelRegistry?: { list(): Array<{ readonly name: string; readonly status: string }> };
/** Optional webhook handler for inbound webhook HTTP routes. */ /** Optional webhook handler for inbound webhook HTTP routes. */
webhookHandler?: WebhookHandler; webhookHandler?: WebhookHandler;
/** Optional Gmail handler for Pub/Sub push notifications. */
gmailHandler?: GmailWatcher;
} }
export class GatewayServer { export class GatewayServer {
@@ -223,6 +226,25 @@ export class GatewayServer {
} }
} }
// Gmail Pub/Sub push route — bypass gateway auth (Google sends push notifications directly)
if (this.config.gmailHandler && req.method === 'POST' && req.url?.startsWith('/gmail/push')) {
try {
const body = await this.readRequestBody(req);
const parsed = JSON.parse(body) as { message?: { data?: string } };
const data = parsed?.message?.data;
if (data) {
await this.config.gmailHandler.handlePushNotification(data);
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ ok: true }));
} catch (err) {
console.error('Gmail push handler error:', err instanceof Error ? err.message : err);
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid request' }));
}
return;
}
// Apply auth to HTTP requests when configured // Apply auth to HTTP requests when configured
const authConfig = this.config.auth ?? {}; const authConfig = this.config.auth ?? {};
if (this.config.authHttp !== false && authConfig.token) { if (this.config.authHttp !== false && authConfig.token) {
@@ -299,4 +321,19 @@ export class GatewayServer {
setWebhookHandler(handler: WebhookHandler): void { setWebhookHandler(handler: WebhookHandler): void {
this.config.webhookHandler = handler; this.config.webhookHandler = handler;
} }
/** Set the Gmail handler for Pub/Sub push notifications (late binding). */
setGmailHandler(handler: GmailWatcher): void {
this.config.gmailHandler = handler;
}
/** Read the full request body as a string. */
private readRequestBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on('data', (chunk: Buffer) => chunks.push(chunk));
req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')));
req.on('error', reject);
});
}
} }