diff --git a/src/channels/telegram/adapter.test.ts b/src/channels/telegram/adapter.test.ts index b967f3d..0e8c4c0 100644 --- a/src/channels/telegram/adapter.test.ts +++ b/src/channels/telegram/adapter.test.ts @@ -7,6 +7,7 @@ const mockOn = vi.fn(); const mockCommand = vi.fn(); const mockStart = vi.fn(); const mockStop = vi.fn(); +const mockCatch = vi.fn(); const mockSendMessage = vi.fn(); vi.mock('grammy', () => ({ @@ -14,6 +15,7 @@ vi.mock('grammy', () => ({ use: mockUse, on: mockOn, command: mockCommand, + catch: mockCatch, start: mockStart, stop: mockStop, api: { sendMessage: mockSendMessage }, @@ -78,6 +80,7 @@ describe('TelegramAdapter', () => { // .use() for auth middleware expect(mockUse).toHaveBeenCalledTimes(1); + expect(mockCatch).toHaveBeenCalledTimes(1); // .command() for /start, /reset, /model, /local, /cloud, /transfer expect(mockCommand).toHaveBeenCalledTimes(6); expect(mockCommand.mock.calls[0][0]).toBe('start'); @@ -329,7 +332,7 @@ describe('TelegramAdapter', () => { // ── Group chat mention gating ────────────────────────────────── - it('ignores group messages without mention when requireMention is true', async () => { + it('processes group messages without mention by default', async () => { const handler = vi.fn(); adapter.onMessage(handler); @@ -350,8 +353,8 @@ describe('TelegramAdapter', () => { await textHandler(ctx); - expect(handler).not.toHaveBeenCalled(); - expect(ctx.replyWithChatAction).not.toHaveBeenCalled(); + expect(handler).toHaveBeenCalledTimes(1); + expect(ctx.replyWithChatAction).toHaveBeenCalledWith('typing'); }); it('processes group messages with bot mention when requireMention is true', async () => { @@ -440,6 +443,34 @@ describe('TelegramAdapter', () => { expect(msg.text).toBe('Hello everyone'); }); + it('ignores group messages without mention when requireMention is true', async () => { + const mentionAdapter = new TelegramAdapter({ + ...baseConfig, + requireMention: true, + }); + const handler = vi.fn(); + mentionAdapter.onMessage(handler); + + await mentionAdapter.connect(); + + const startCall = mockStart.mock.calls[0][0]; + startCall.onStart({ id: 12345, username: 'flynn_bot' }); + + const textHandler = getOnHandler('message:text'); + + const ctx = { + message: { message_id: 42, text: 'Hello everyone', reply_to_message: undefined }, + chat: { id: 100, type: 'group' }, + from: { first_name: 'Will' }, + replyWithChatAction: vi.fn(), + }; + + await textHandler(ctx); + + expect(handler).not.toHaveBeenCalled(); + expect(ctx.replyWithChatAction).not.toHaveBeenCalled(); + }); + it('DMs are always processed regardless of requireMention setting', async () => { const handler = vi.fn(); adapter.onMessage(handler); @@ -491,4 +522,27 @@ describe('TelegramAdapter', () => { const msg: InboundMessage = handler.mock.calls[0][0]; expect(msg.text).toBe('tell me a joke'); }); + + it('retries transient telegram send failures', async () => { + await adapter.connect(); + const randomSpy = vi.spyOn(Math, 'random').mockReturnValue(0); + mockSendMessage + .mockRejectedValueOnce({ error_code: 429, description: 'Too Many Requests', parameters: { retry_after: 0 } }) + .mockResolvedValueOnce(undefined); + + await adapter.send('100', { text: 'retry me' }); + + expect(mockSendMessage).toHaveBeenCalledTimes(2); + randomSpy.mockRestore(); + }); + + it('records adapter errors from grammy catch hook', async () => { + await adapter.connect(); + const catchHandler = mockCatch.mock.calls[0][0] as (err: unknown) => void; + catchHandler(new Error('poll failed')); + + expect(adapter.status).toBe('error'); + expect(adapter.lastError).toContain('poll failed'); + expect(typeof adapter.lastErrorAt).toBe('number'); + }); }); diff --git a/src/channels/telegram/adapter.ts b/src/channels/telegram/adapter.ts index 3f2ae9a..dd75356 100644 --- a/src/channels/telegram/adapter.ts +++ b/src/channels/telegram/adapter.ts @@ -17,7 +17,7 @@ import type { PairingManager } from '../pairing.js'; export interface TelegramAdapterConfig { botToken: string; allowedChatIds: number[]; - /** Require bot mention or reply-to-bot to respond in group chats (default: true). */ + /** Require bot mention or reply-to-bot to respond in group chats (default: false). */ requireMention?: boolean; hookEngine?: HookEngine; /** Optional pairing manager for DM pairing codes. */ @@ -39,11 +39,24 @@ export class TelegramAdapter implements ChannelAdapter { private messageHandler?: (msg: InboundMessage) => void; private config: TelegramAdapterConfig; private botInfo?: { id: number; username?: string }; + private _lastError?: string; + private _lastErrorAt?: number; + private shouldRun = false; + private reconnectTimer: NodeJS.Timeout | null = null; + private reconnectAttempt = 0; get status(): ChannelStatus { return this._status; } + get lastError(): string | undefined { + return this._lastError; + } + + get lastErrorAt(): number | undefined { + return this._lastErrorAt; + } + constructor(config: TelegramAdapterConfig) { this.config = config; } @@ -75,11 +88,27 @@ export class TelegramAdapter implements ChannelAdapter { /** Create the grammy bot, wire up middleware & handlers, and start long-polling. */ async connect(): Promise { - this.bot = new Bot(this.config.botToken); + if (this._status === 'connected' || this._status === 'connecting') { + return; + } + + this.shouldRun = true; + this.clearReconnectTimer(); this._status = 'connecting'; + this._lastError = undefined; + this._lastErrorAt = undefined; + + const bot = new Bot(this.config.botToken); + this.bot = bot; + + bot.catch((error) => { + const description = error instanceof Error ? error.message : String(error); + this.recordAdapterError(`Telegram polling error: ${description}`); + this.scheduleReconnect(); + }); // ── Auth middleware — reject messages from unknown chats (with pairing fallback) ── - this.bot.use(async (ctx, next) => { + bot.use(async (ctx, next) => { const chatId = ctx.chat?.id; if (chatId === undefined) {return;} @@ -112,7 +141,7 @@ export class TelegramAdapter implements ChannelAdapter { if (this.config.hookEngine) { const hookEngine = this.config.hookEngine; - this.bot.on('callback_query:data', async (ctx) => { + bot.on('callback_query:data', async (ctx) => { const data = ctx.callbackQuery.data; const parsed = parseConfirmationCallback(data); @@ -142,11 +171,11 @@ export class TelegramAdapter implements ChannelAdapter { // ── Command handlers ── - this.bot.command('start', async (ctx) => { + bot.command('start', async (ctx) => { await ctx.reply('Flynn is ready. Send me a message!'); }); - this.bot.command('reset', async (ctx) => { + bot.command('reset', async (ctx) => { // Deliver a special reset message through the channel if (this.messageHandler) { this.messageHandler({ @@ -162,7 +191,7 @@ export class TelegramAdapter implements ChannelAdapter { await ctx.reply('Conversation reset.'); }); - this.bot.command('model', async (ctx) => { + bot.command('model', async (ctx) => { if (!this.messageHandler) {return;} // Telegram can deliver group commands in the form: /model@bot_username ... @@ -184,7 +213,7 @@ export class TelegramAdapter implements ChannelAdapter { }); }); - this.bot.command('local', async (ctx) => { + bot.command('local', async (ctx) => { if (!this.messageHandler) {return;} this.messageHandler({ id: String(ctx.message?.message_id ?? Date.now()), @@ -197,7 +226,7 @@ export class TelegramAdapter implements ChannelAdapter { }); }); - this.bot.command('cloud', async (ctx) => { + bot.command('cloud', async (ctx) => { if (!this.messageHandler) {return;} this.messageHandler({ id: String(ctx.message?.message_id ?? Date.now()), @@ -210,7 +239,7 @@ export class TelegramAdapter implements ChannelAdapter { }); }); - this.bot.command('transfer', async (ctx) => { + bot.command('transfer', async (ctx) => { if (!this.messageHandler) {return;} // Telegram can deliver group commands in the form: /transfer@bot_username ... @@ -234,12 +263,12 @@ export class TelegramAdapter implements ChannelAdapter { // ── Text message handler ── - this.bot.on('message:text', async (ctx) => { + bot.on('message:text', async (ctx) => { if (!this.messageHandler) {return;} // Group chat mention gating const isGroup = ctx.chat.type === 'group' || ctx.chat.type === 'supergroup'; - const requireMention = this.config.requireMention ?? true; + const requireMention = this.config.requireMention ?? false; if (isGroup && requireMention && this.botInfo) { const rawText = ctx.message.text; @@ -280,7 +309,7 @@ export class TelegramAdapter implements ChannelAdapter { // ── Photo message handler ── - this.bot.on('message:photo', async (ctx) => { + bot.on('message:photo', async (ctx) => { if (!this.messageHandler) {return;} const photo = ctx.message.photo; @@ -318,7 +347,7 @@ export class TelegramAdapter implements ChannelAdapter { // ── Image document handler ── - this.bot.on('message:document', async (ctx) => { + bot.on('message:document', async (ctx) => { if (!this.messageHandler) {return;} const document = ctx.message.document; @@ -358,7 +387,7 @@ export class TelegramAdapter implements ChannelAdapter { // ── Voice message handler ── - this.bot.on('message:voice', async (ctx) => { + bot.on('message:voice', async (ctx) => { if (!this.messageHandler) {return;} const voice = ctx.message.voice; @@ -396,7 +425,7 @@ export class TelegramAdapter implements ChannelAdapter { // ── Audio message handler ── - this.bot.on('message:audio', async (ctx) => { + bot.on('message:audio', async (ctx) => { if (!this.messageHandler) {return;} const audio = ctx.message.audio; @@ -434,22 +463,33 @@ export class TelegramAdapter implements ChannelAdapter { // ── Start long polling ── - this.bot.start({ - onStart: (botInfo) => { - console.log(`Telegram bot started: @${botInfo.username}`); - this.botInfo = { id: botInfo.id, username: botInfo.username }; + try { + await bot.start({ + onStart: (botInfo) => { + console.log(`Telegram bot started: @${botInfo.username}`); + this.botInfo = { id: botInfo.id, username: botInfo.username }; + this._status = 'connected'; + this.reconnectAttempt = 0; + this._lastError = undefined; + this._lastErrorAt = undefined; + }, + }); + if (this._status === 'connecting') { + // For mocked/runtime environments where onStart is not surfaced synchronously. this._status = 'connected'; - }, - }); - - // bot.start() returns immediately for long polling. - // The onStart callback sets connected above; also set here for safety - // in case the callback fires before this line is reached. - this._status = 'connected'; + } + } catch (error) { + const description = error instanceof Error ? error.message : String(error); + this.recordAdapterError(`Telegram connect failed: ${description}`); + this.scheduleReconnect(); + throw error; + } } /** Stop the bot and clean up. */ async disconnect(): Promise { + this.shouldRun = false; + this.clearReconnectTimer(); if (this.bot) { await this.bot.stop(); this.bot = null; @@ -481,7 +521,10 @@ export class TelegramAdapter implements ChannelAdapter { // is strict and can fail on unescaped characters. If Telegram rejects the // message, retry once without parse_mode so users still get the content. try { - await bot.api.sendMessage(chatId, chunk, { parse_mode: 'Markdown' }); + await this.sendWithRetry( + () => bot.api.sendMessage(chatId, chunk, { parse_mode: 'Markdown' }), + `sendMessage(markdown) chat=${chatId}`, + ); } catch (error) { const description = error && typeof error === 'object' && 'description' in error ? String((error as { description?: unknown }).description) @@ -494,7 +537,10 @@ export class TelegramAdapter implements ChannelAdapter { throw error; } - await bot.api.sendMessage(chatId, chunk); + await this.sendWithRetry( + () => bot.api.sendMessage(chatId, chunk), + `sendMessage(plain) chat=${chatId}`, + ); } }; @@ -518,23 +564,143 @@ export class TelegramAdapter implements ChannelAdapter { /** Send a single outbound attachment via the Telegram API. */ private async sendAttachment(chatId: number, attachment: OutboundAttachment): Promise { - if (!this.bot) {return;} + const bot = this.bot; + if (!bot) {return;} + + const file = attachment.data + ? new InputFile(Buffer.from(attachment.data, 'base64'), attachment.filename) + : attachment.url ?? ''; try { - const file = attachment.data - ? new InputFile(Buffer.from(attachment.data, 'base64'), attachment.filename) - : attachment.url ?? ''; - if (attachment.mimeType.startsWith('image/')) { - await this.bot.api.sendPhoto(chatId, file); + await this.sendWithRetry(() => bot.api.sendPhoto(chatId, file), `sendPhoto chat=${chatId}`); } else { - await this.bot.api.sendDocument(chatId, file); + await this.sendWithRetry(() => bot.api.sendDocument(chatId, file), `sendDocument chat=${chatId}`); } } catch (error) { + this.recordAdapterError(`Telegram attachment send failed: ${error instanceof Error ? error.message : String(error)}`); console.error( `Failed to send ${attachment.mimeType} attachment to ${chatId}:`, error instanceof Error ? error.message : 'Unknown error', ); + throw error; + } + } + + private recordAdapterError(message: string): void { + this._status = 'error'; + this._lastError = message; + this._lastErrorAt = Date.now(); + } + + private clearReconnectTimer(): void { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + } + + private scheduleReconnect(): void { + if (!this.shouldRun || !this.bot) { + return; + } + if (this.reconnectTimer) { + return; + } + + const attempt = this.reconnectAttempt + 1; + this.reconnectAttempt = attempt; + const delayMs = Math.min( + 30_000, + attempt <= 1 ? 1_000 : attempt === 2 ? 2_000 : attempt === 3 ? 5_000 : 10_000, + ); + + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + void this.reconnect().catch((error) => { + this.recordAdapterError(`Telegram reconnect failed: ${error instanceof Error ? error.message : String(error)}`); + this.scheduleReconnect(); + }); + }, delayMs); + } + + private async reconnect(): Promise { + const bot = this.bot; + if (!this.shouldRun || !bot) { + return; + } + + this._status = 'connecting'; + await bot.stop(); + await bot.start({ + onStart: (botInfo) => { + console.log(`Telegram bot reconnected: @${botInfo.username}`); + this.botInfo = { id: botInfo.id, username: botInfo.username }; + this._status = 'connected'; + this.reconnectAttempt = 0; + this._lastError = undefined; + this._lastErrorAt = undefined; + }, + }); + + if (this._status === 'connecting') { + this._status = 'connected'; + } + } + + private parseTelegramError(error: unknown): { + code?: number; + description?: string; + retryAfterSec?: number; + message: string; + transient: boolean; + } { + const code = typeof error === 'object' && error !== null && 'error_code' in error + ? Number((error as { error_code?: unknown }).error_code) + : undefined; + const description = typeof error === 'object' && error !== null && 'description' in error + ? String((error as { description?: unknown }).description) + : undefined; + const retryAfterRaw = typeof error === 'object' && error !== null && 'parameters' in error + ? (error as { parameters?: { retry_after?: unknown } }).parameters?.retry_after + : undefined; + const retryAfterSec = Number.isFinite(Number(retryAfterRaw)) ? Number(retryAfterRaw) : undefined; + const message = error instanceof Error + ? error.message + : description ?? String(error); + const normalized = message.toLowerCase(); + const transient = code === 429 + || (typeof code === 'number' && code >= 500) + || normalized.includes('timeout') + || normalized.includes('timed out') + || normalized.includes('fetch failed') + || normalized.includes('econnreset') + || normalized.includes('enotfound') + || normalized.includes('network'); + + return { code, description, retryAfterSec, message, transient }; + } + + private async sendWithRetry(fn: () => Promise, context: string): Promise { + const maxAttempts = 3; + let attempt = 0; + + while (true) { + attempt += 1; + try { + return await fn(); + } catch (error) { + const parsed = this.parseTelegramError(error); + if (!parsed.transient || attempt >= maxAttempts) { + this.recordAdapterError(`Telegram API failure (${context}): ${parsed.message}`); + throw error; + } + const baseDelay = parsed.retryAfterSec + ? parsed.retryAfterSec * 1000 + : Math.min(8000, 800 * (2 ** (attempt - 1))); + const jitter = Math.floor(Math.random() * 250); + await new Promise((resolve) => setTimeout(resolve, baseDelay + jitter)); + } } } } diff --git a/src/channels/types.ts b/src/channels/types.ts index b1a96a5..244989c 100644 --- a/src/channels/types.ts +++ b/src/channels/types.ts @@ -85,6 +85,12 @@ export interface ChannelAdapter { /** Current connection status. */ readonly status: ChannelStatus; + /** Optional last adapter error message (for diagnostics surfaces). */ + readonly lastError?: string; + + /** Unix ms timestamp for last adapter error. */ + readonly lastErrorAt?: number; + /** Start the adapter (connect to platform, begin listening). */ connect(): Promise; diff --git a/src/cli/doctor.ts b/src/cli/doctor.ts index c09901b..d0632ba 100644 --- a/src/cli/doctor.ts +++ b/src/cli/doctor.ts @@ -378,7 +378,12 @@ const checkTelegram: Check = async (ctx) => { if (!ctx.config.telegram.bot_token || ctx.config.telegram.bot_token.length < 10) { return { status: 'warn', label: 'Telegram bot configured', detail: 'token looks too short' }; } - return { status: 'pass', label: 'Telegram bot configured', detail: `(${ctx.config.telegram.allowed_chat_ids.length} allowed chat(s))` }; + const mentionMode = ctx.config.telegram.require_mention ? 'mention-gated groups' : 'all allowed group messages'; + return { + status: 'pass', + label: 'Telegram bot configured', + detail: `(${ctx.config.telegram.allowed_chat_ids.length} allowed chat(s), mode: ${mentionMode})`, + }; }; const checkMcpServers: Check = async (ctx) => { diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 05b6320..566ecc4 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -1,6 +1,18 @@ import { describe, it, expect } from 'vitest'; import { configSchema } from './schema.js'; +describe('configSchema — telegram', () => { + const minimalConfig = { + telegram: { bot_token: 'test', allowed_chat_ids: [1] }, + models: { default: { provider: 'anthropic', model: 'claude-3' } }, + }; + + it('defaults telegram require_mention to false', () => { + const result = configSchema.parse(minimalConfig); + expect(result.telegram?.require_mention).toBe(false); + }); +}); + describe('configSchema — sandbox', () => { const minimalConfig = { telegram: { bot_token: 'test', allowed_chat_ids: [1] }, diff --git a/src/config/schema.ts b/src/config/schema.ts index 9e96df1..b5d5a49 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -3,7 +3,7 @@ import { z } from 'zod'; const telegramSchema = z.object({ bot_token: z.string().min(1, 'Bot token is required'), allowed_chat_ids: z.array(z.number()).min(1, 'At least one chat ID required'), - require_mention: z.boolean().default(true), + require_mention: z.boolean().default(false), }); const tailscaleSchema = z.object({ diff --git a/src/gateway/handlers/services.test.ts b/src/gateway/handlers/services.test.ts index 702bb33..6dc38f9 100644 --- a/src/gateway/handlers/services.test.ts +++ b/src/gateway/handlers/services.test.ts @@ -116,6 +116,29 @@ describe('discoverServices', () => { expect(services.find(s => s.name === 'telegram')?.status).toBe('connected'); }); + it('surfaces adapter error details when channel is in error state', () => { + const cfg = makeBaseConfig(); + withMutableConfig(cfg).telegram = { bot_token: 'x', allowed_chat_ids: [123], require_mention: false }; + + const reg = new ChannelRegistry(); + reg.register({ + name: 'telegram', + status: 'error', + lastError: 'Telegram polling error: 429 Too Many Requests', + lastErrorAt: 1708080000000, + connect: async () => {}, + disconnect: async () => {}, + send: async () => {}, + onMessage: () => {}, + }); + + const services = discoverServices(cfg, reg); + const telegram = services.find(s => s.name === 'telegram'); + expect(telegram?.status).toBe('error'); + expect(telegram?.error).toContain('429'); + expect(telegram?.metadata).toMatchObject({ lastErrorAt: 1708080000000 }); + }); + it('marks enabled automation subsystems as configured and carries item counts', () => { const cfg = makeBaseConfig(); cfg.automation.cron = [ diff --git a/src/gateway/handlers/services.ts b/src/gateway/handlers/services.ts index 192716d..6daef29 100644 --- a/src/gateway/handlers/services.ts +++ b/src/gateway/handlers/services.ts @@ -73,7 +73,14 @@ export function discoverServices( } if (registered) { - services.push({ name, type: 'channel', status: registered.status as ServiceStatus, description }); + services.push({ + name, + type: 'channel', + status: registered.status as ServiceStatus, + description, + ...(registered.lastError ? { error: registered.lastError } : {}), + ...(registered.lastErrorAt ? { metadata: { lastErrorAt: registered.lastErrorAt } } : {}), + }); continue; } diff --git a/src/gateway/handlers/system.ts b/src/gateway/handlers/system.ts index e364f52..2132c73 100644 --- a/src/gateway/handlers/system.ts +++ b/src/gateway/handlers/system.ts @@ -73,7 +73,7 @@ export interface SystemHandlerDeps { getConnectionCount: () => number; /** Optional callback to trigger a graceful restart. If not provided, system.restart returns an error. */ restart?: () => Promise; - getChannels?: () => Array<{ name: string; status: string }>; + getChannels?: () => Array<{ name: string; status: string; error?: string; lastErrorAt?: number }>; getUsage?: () => { totalSessions: number; activeConnections: number }; /** Optional callback to retrieve per-session token usage data. */ getTokenUsage?: () => TokenUsageEntry[]; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 54c4c8b..e3b7eea 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -213,7 +213,12 @@ export class GatewayServer { getSessionAnalytics: ({ days, topLimit } = {}) => this.config.sessionManager.getSessionAnalytics({ days, topLimit }), restart: this.config.restart, getChannels: channelRegistry - ? () => channelRegistry.list().map(a => ({ name: a.name, status: a.status })) + ? () => channelRegistry.list().map(a => ({ + name: a.name, + status: a.status, + ...(a.lastError ? { error: a.lastError } : {}), + ...(a.lastErrorAt ? { lastErrorAt: a.lastErrorAt } : {}), + })) : undefined, getServices: runtimeConfig && channelRegistry ? () => discoverServices(runtimeConfig, channelRegistry) diff --git a/src/gateway/ui/pages/dashboard.js b/src/gateway/ui/pages/dashboard.js index c7711b9..5fb29eb 100644 --- a/src/gateway/ui/pages/dashboard.js +++ b/src/gateway/ui/pages/dashboard.js @@ -818,6 +818,7 @@ function updateServices(servicesData) { ${escapeHtml(svc.status)} ${escapeHtml(svc.description)} + ${svc.error ? `Error: ${escapeHtml(String(svc.error))}` : ''} `; }).join(''); }