feat(telegram): harden channel reliability with retries and error diagnostics

This commit is contained in:
William Valentin
2026-02-18 13:12:11 -08:00
parent 0ccbe65d3f
commit 42ae4a75df
11 changed files with 323 additions and 44 deletions
+57 -3
View File
@@ -7,6 +7,7 @@ const mockOn = vi.fn();
const mockCommand = vi.fn();
const mockStart = vi.fn();
const mockStop = vi.fn();
const mockCatch = vi.fn();
const mockSendMessage = vi.fn();
vi.mock('grammy', () => ({
@@ -14,6 +15,7 @@ vi.mock('grammy', () => ({
use: mockUse,
on: mockOn,
command: mockCommand,
catch: mockCatch,
start: mockStart,
stop: mockStop,
api: { sendMessage: mockSendMessage },
@@ -78,6 +80,7 @@ describe('TelegramAdapter', () => {
// .use() for auth middleware
expect(mockUse).toHaveBeenCalledTimes(1);
expect(mockCatch).toHaveBeenCalledTimes(1);
// .command() for /start, /reset, /model, /local, /cloud, /transfer
expect(mockCommand).toHaveBeenCalledTimes(6);
expect(mockCommand.mock.calls[0][0]).toBe('start');
@@ -329,7 +332,7 @@ describe('TelegramAdapter', () => {
// ── Group chat mention gating ──────────────────────────────────
it('ignores group messages without mention when requireMention is true', async () => {
it('processes group messages without mention by default', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
@@ -350,8 +353,8 @@ describe('TelegramAdapter', () => {
await textHandler(ctx);
expect(handler).not.toHaveBeenCalled();
expect(ctx.replyWithChatAction).not.toHaveBeenCalled();
expect(handler).toHaveBeenCalledTimes(1);
expect(ctx.replyWithChatAction).toHaveBeenCalledWith('typing');
});
it('processes group messages with bot mention when requireMention is true', async () => {
@@ -440,6 +443,34 @@ describe('TelegramAdapter', () => {
expect(msg.text).toBe('Hello everyone');
});
it('ignores group messages without mention when requireMention is true', async () => {
const mentionAdapter = new TelegramAdapter({
...baseConfig,
requireMention: true,
});
const handler = vi.fn();
mentionAdapter.onMessage(handler);
await mentionAdapter.connect();
const startCall = mockStart.mock.calls[0][0];
startCall.onStart({ id: 12345, username: 'flynn_bot' });
const textHandler = getOnHandler('message:text');
const ctx = {
message: { message_id: 42, text: 'Hello everyone', reply_to_message: undefined },
chat: { id: 100, type: 'group' },
from: { first_name: 'Will' },
replyWithChatAction: vi.fn(),
};
await textHandler(ctx);
expect(handler).not.toHaveBeenCalled();
expect(ctx.replyWithChatAction).not.toHaveBeenCalled();
});
it('DMs are always processed regardless of requireMention setting', async () => {
const handler = vi.fn();
adapter.onMessage(handler);
@@ -491,4 +522,27 @@ describe('TelegramAdapter', () => {
const msg: InboundMessage = handler.mock.calls[0][0];
expect(msg.text).toBe('tell me a joke');
});
it('retries transient telegram send failures', async () => {
await adapter.connect();
const randomSpy = vi.spyOn(Math, 'random').mockReturnValue(0);
mockSendMessage
.mockRejectedValueOnce({ error_code: 429, description: 'Too Many Requests', parameters: { retry_after: 0 } })
.mockResolvedValueOnce(undefined);
await adapter.send('100', { text: 'retry me' });
expect(mockSendMessage).toHaveBeenCalledTimes(2);
randomSpy.mockRestore();
});
it('records adapter errors from grammy catch hook', async () => {
await adapter.connect();
const catchHandler = mockCatch.mock.calls[0][0] as (err: unknown) => void;
catchHandler(new Error('poll failed'));
expect(adapter.status).toBe('error');
expect(adapter.lastError).toContain('poll failed');
expect(typeof adapter.lastErrorAt).toBe('number');
});
});
+202 -36
View File
@@ -17,7 +17,7 @@ import type { PairingManager } from '../pairing.js';
export interface TelegramAdapterConfig {
botToken: string;
allowedChatIds: number[];
/** Require bot mention or reply-to-bot to respond in group chats (default: true). */
/** Require bot mention or reply-to-bot to respond in group chats (default: false). */
requireMention?: boolean;
hookEngine?: HookEngine;
/** Optional pairing manager for DM pairing codes. */
@@ -39,11 +39,24 @@ export class TelegramAdapter implements ChannelAdapter {
private messageHandler?: (msg: InboundMessage) => void;
private config: TelegramAdapterConfig;
private botInfo?: { id: number; username?: string };
private _lastError?: string;
private _lastErrorAt?: number;
private shouldRun = false;
private reconnectTimer: NodeJS.Timeout | null = null;
private reconnectAttempt = 0;
get status(): ChannelStatus {
return this._status;
}
get lastError(): string | undefined {
return this._lastError;
}
get lastErrorAt(): number | undefined {
return this._lastErrorAt;
}
constructor(config: TelegramAdapterConfig) {
this.config = config;
}
@@ -75,11 +88,27 @@ export class TelegramAdapter implements ChannelAdapter {
/** Create the grammy bot, wire up middleware & handlers, and start long-polling. */
async connect(): Promise<void> {
this.bot = new Bot(this.config.botToken);
if (this._status === 'connected' || this._status === 'connecting') {
return;
}
this.shouldRun = true;
this.clearReconnectTimer();
this._status = 'connecting';
this._lastError = undefined;
this._lastErrorAt = undefined;
const bot = new Bot(this.config.botToken);
this.bot = bot;
bot.catch((error) => {
const description = error instanceof Error ? error.message : String(error);
this.recordAdapterError(`Telegram polling error: ${description}`);
this.scheduleReconnect();
});
// ── Auth middleware — reject messages from unknown chats (with pairing fallback) ──
this.bot.use(async (ctx, next) => {
bot.use(async (ctx, next) => {
const chatId = ctx.chat?.id;
if (chatId === undefined) {return;}
@@ -112,7 +141,7 @@ export class TelegramAdapter implements ChannelAdapter {
if (this.config.hookEngine) {
const hookEngine = this.config.hookEngine;
this.bot.on('callback_query:data', async (ctx) => {
bot.on('callback_query:data', async (ctx) => {
const data = ctx.callbackQuery.data;
const parsed = parseConfirmationCallback(data);
@@ -142,11 +171,11 @@ export class TelegramAdapter implements ChannelAdapter {
// ── Command handlers ──
this.bot.command('start', async (ctx) => {
bot.command('start', async (ctx) => {
await ctx.reply('Flynn is ready. Send me a message!');
});
this.bot.command('reset', async (ctx) => {
bot.command('reset', async (ctx) => {
// Deliver a special reset message through the channel
if (this.messageHandler) {
this.messageHandler({
@@ -162,7 +191,7 @@ export class TelegramAdapter implements ChannelAdapter {
await ctx.reply('Conversation reset.');
});
this.bot.command('model', async (ctx) => {
bot.command('model', async (ctx) => {
if (!this.messageHandler) {return;}
// Telegram can deliver group commands in the form: /model@bot_username ...
@@ -184,7 +213,7 @@ export class TelegramAdapter implements ChannelAdapter {
});
});
this.bot.command('local', async (ctx) => {
bot.command('local', async (ctx) => {
if (!this.messageHandler) {return;}
this.messageHandler({
id: String(ctx.message?.message_id ?? Date.now()),
@@ -197,7 +226,7 @@ export class TelegramAdapter implements ChannelAdapter {
});
});
this.bot.command('cloud', async (ctx) => {
bot.command('cloud', async (ctx) => {
if (!this.messageHandler) {return;}
this.messageHandler({
id: String(ctx.message?.message_id ?? Date.now()),
@@ -210,7 +239,7 @@ export class TelegramAdapter implements ChannelAdapter {
});
});
this.bot.command('transfer', async (ctx) => {
bot.command('transfer', async (ctx) => {
if (!this.messageHandler) {return;}
// Telegram can deliver group commands in the form: /transfer@bot_username ...
@@ -234,12 +263,12 @@ export class TelegramAdapter implements ChannelAdapter {
// ── Text message handler ──
this.bot.on('message:text', async (ctx) => {
bot.on('message:text', async (ctx) => {
if (!this.messageHandler) {return;}
// Group chat mention gating
const isGroup = ctx.chat.type === 'group' || ctx.chat.type === 'supergroup';
const requireMention = this.config.requireMention ?? true;
const requireMention = this.config.requireMention ?? false;
if (isGroup && requireMention && this.botInfo) {
const rawText = ctx.message.text;
@@ -280,7 +309,7 @@ export class TelegramAdapter implements ChannelAdapter {
// ── Photo message handler ──
this.bot.on('message:photo', async (ctx) => {
bot.on('message:photo', async (ctx) => {
if (!this.messageHandler) {return;}
const photo = ctx.message.photo;
@@ -318,7 +347,7 @@ export class TelegramAdapter implements ChannelAdapter {
// ── Image document handler ──
this.bot.on('message:document', async (ctx) => {
bot.on('message:document', async (ctx) => {
if (!this.messageHandler) {return;}
const document = ctx.message.document;
@@ -358,7 +387,7 @@ export class TelegramAdapter implements ChannelAdapter {
// ── Voice message handler ──
this.bot.on('message:voice', async (ctx) => {
bot.on('message:voice', async (ctx) => {
if (!this.messageHandler) {return;}
const voice = ctx.message.voice;
@@ -396,7 +425,7 @@ export class TelegramAdapter implements ChannelAdapter {
// ── Audio message handler ──
this.bot.on('message:audio', async (ctx) => {
bot.on('message:audio', async (ctx) => {
if (!this.messageHandler) {return;}
const audio = ctx.message.audio;
@@ -434,22 +463,33 @@ export class TelegramAdapter implements ChannelAdapter {
// ── Start long polling ──
this.bot.start({
onStart: (botInfo) => {
console.log(`Telegram bot started: @${botInfo.username}`);
this.botInfo = { id: botInfo.id, username: botInfo.username };
try {
await bot.start({
onStart: (botInfo) => {
console.log(`Telegram bot started: @${botInfo.username}`);
this.botInfo = { id: botInfo.id, username: botInfo.username };
this._status = 'connected';
this.reconnectAttempt = 0;
this._lastError = undefined;
this._lastErrorAt = undefined;
},
});
if (this._status === 'connecting') {
// For mocked/runtime environments where onStart is not surfaced synchronously.
this._status = 'connected';
},
});
// bot.start() returns immediately for long polling.
// The onStart callback sets connected above; also set here for safety
// in case the callback fires before this line is reached.
this._status = 'connected';
}
} catch (error) {
const description = error instanceof Error ? error.message : String(error);
this.recordAdapterError(`Telegram connect failed: ${description}`);
this.scheduleReconnect();
throw error;
}
}
/** Stop the bot and clean up. */
async disconnect(): Promise<void> {
this.shouldRun = false;
this.clearReconnectTimer();
if (this.bot) {
await this.bot.stop();
this.bot = null;
@@ -481,7 +521,10 @@ export class TelegramAdapter implements ChannelAdapter {
// is strict and can fail on unescaped characters. If Telegram rejects the
// message, retry once without parse_mode so users still get the content.
try {
await bot.api.sendMessage(chatId, chunk, { parse_mode: 'Markdown' });
await this.sendWithRetry(
() => bot.api.sendMessage(chatId, chunk, { parse_mode: 'Markdown' }),
`sendMessage(markdown) chat=${chatId}`,
);
} catch (error) {
const description = error && typeof error === 'object' && 'description' in error
? String((error as { description?: unknown }).description)
@@ -494,7 +537,10 @@ export class TelegramAdapter implements ChannelAdapter {
throw error;
}
await bot.api.sendMessage(chatId, chunk);
await this.sendWithRetry(
() => bot.api.sendMessage(chatId, chunk),
`sendMessage(plain) chat=${chatId}`,
);
}
};
@@ -518,23 +564,143 @@ export class TelegramAdapter implements ChannelAdapter {
/** Send a single outbound attachment via the Telegram API. */
private async sendAttachment(chatId: number, attachment: OutboundAttachment): Promise<void> {
if (!this.bot) {return;}
const bot = this.bot;
if (!bot) {return;}
const file = attachment.data
? new InputFile(Buffer.from(attachment.data, 'base64'), attachment.filename)
: attachment.url ?? '';
try {
const file = attachment.data
? new InputFile(Buffer.from(attachment.data, 'base64'), attachment.filename)
: attachment.url ?? '';
if (attachment.mimeType.startsWith('image/')) {
await this.bot.api.sendPhoto(chatId, file);
await this.sendWithRetry(() => bot.api.sendPhoto(chatId, file), `sendPhoto chat=${chatId}`);
} else {
await this.bot.api.sendDocument(chatId, file);
await this.sendWithRetry(() => bot.api.sendDocument(chatId, file), `sendDocument chat=${chatId}`);
}
} catch (error) {
this.recordAdapterError(`Telegram attachment send failed: ${error instanceof Error ? error.message : String(error)}`);
console.error(
`Failed to send ${attachment.mimeType} attachment to ${chatId}:`,
error instanceof Error ? error.message : 'Unknown error',
);
throw error;
}
}
private recordAdapterError(message: string): void {
this._status = 'error';
this._lastError = message;
this._lastErrorAt = Date.now();
}
private clearReconnectTimer(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
private scheduleReconnect(): void {
if (!this.shouldRun || !this.bot) {
return;
}
if (this.reconnectTimer) {
return;
}
const attempt = this.reconnectAttempt + 1;
this.reconnectAttempt = attempt;
const delayMs = Math.min(
30_000,
attempt <= 1 ? 1_000 : attempt === 2 ? 2_000 : attempt === 3 ? 5_000 : 10_000,
);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
void this.reconnect().catch((error) => {
this.recordAdapterError(`Telegram reconnect failed: ${error instanceof Error ? error.message : String(error)}`);
this.scheduleReconnect();
});
}, delayMs);
}
private async reconnect(): Promise<void> {
const bot = this.bot;
if (!this.shouldRun || !bot) {
return;
}
this._status = 'connecting';
await bot.stop();
await bot.start({
onStart: (botInfo) => {
console.log(`Telegram bot reconnected: @${botInfo.username}`);
this.botInfo = { id: botInfo.id, username: botInfo.username };
this._status = 'connected';
this.reconnectAttempt = 0;
this._lastError = undefined;
this._lastErrorAt = undefined;
},
});
if (this._status === 'connecting') {
this._status = 'connected';
}
}
private parseTelegramError(error: unknown): {
code?: number;
description?: string;
retryAfterSec?: number;
message: string;
transient: boolean;
} {
const code = typeof error === 'object' && error !== null && 'error_code' in error
? Number((error as { error_code?: unknown }).error_code)
: undefined;
const description = typeof error === 'object' && error !== null && 'description' in error
? String((error as { description?: unknown }).description)
: undefined;
const retryAfterRaw = typeof error === 'object' && error !== null && 'parameters' in error
? (error as { parameters?: { retry_after?: unknown } }).parameters?.retry_after
: undefined;
const retryAfterSec = Number.isFinite(Number(retryAfterRaw)) ? Number(retryAfterRaw) : undefined;
const message = error instanceof Error
? error.message
: description ?? String(error);
const normalized = message.toLowerCase();
const transient = code === 429
|| (typeof code === 'number' && code >= 500)
|| normalized.includes('timeout')
|| normalized.includes('timed out')
|| normalized.includes('fetch failed')
|| normalized.includes('econnreset')
|| normalized.includes('enotfound')
|| normalized.includes('network');
return { code, description, retryAfterSec, message, transient };
}
private async sendWithRetry<T>(fn: () => Promise<T>, context: string): Promise<T> {
const maxAttempts = 3;
let attempt = 0;
while (true) {
attempt += 1;
try {
return await fn();
} catch (error) {
const parsed = this.parseTelegramError(error);
if (!parsed.transient || attempt >= maxAttempts) {
this.recordAdapterError(`Telegram API failure (${context}): ${parsed.message}`);
throw error;
}
const baseDelay = parsed.retryAfterSec
? parsed.retryAfterSec * 1000
: Math.min(8000, 800 * (2 ** (attempt - 1)));
const jitter = Math.floor(Math.random() * 250);
await new Promise((resolve) => setTimeout(resolve, baseDelay + jitter));
}
}
}
}
+6
View File
@@ -85,6 +85,12 @@ export interface ChannelAdapter {
/** Current connection status. */
readonly status: ChannelStatus;
/** Optional last adapter error message (for diagnostics surfaces). */
readonly lastError?: string;
/** Unix ms timestamp for last adapter error. */
readonly lastErrorAt?: number;
/** Start the adapter (connect to platform, begin listening). */
connect(): Promise<void>;