Files
flynn/src/channels/telegram/adapter.ts
T
William Valentin b9bfee9c5b feat: add outbound attachment support with media.send tool
Introduces OutboundAttachment type on OutboundMessage, an
OutboundAttachmentCollector (push/drain pattern), and a media.send
tool that queues files for outbound delivery. Each channel adapter
(Telegram, Discord, Slack, WhatsApp) sends attachments after the
text reply. Includes 15 tests for collector and tool.
2026-02-07 09:09:00 -08:00

413 lines
12 KiB
TypeScript

import { Bot, InputFile } from 'grammy';
import type { HookEngine } from '../../hooks/index.js';
import type {
Attachment,
InboundMessage,
OutboundMessage,
OutboundAttachment,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
import { isAllowedChat } from '../../frontends/telegram/handlers.js';
import { parseConfirmationCallback } from '../../frontends/telegram/confirmations.js';
import { splitMessage } from '../utils.js';
/** Configuration for the Telegram channel adapter. */
export interface TelegramAdapterConfig {
botToken: string;
allowedChatIds: number[];
/** Require bot mention or reply-to-bot to respond in group chats (default: true). */
requireMention?: boolean;
hookEngine?: HookEngine;
}
/**
* Telegram channel adapter backed by grammy.
*
* Handles authentication via allowed-chat-id filtering,
* confirmation callbacks (when a HookEngine is provided),
* and message chunking for Telegram's 4096-char limit.
*/
export class TelegramAdapter implements ChannelAdapter {
readonly name = 'telegram';
private _status: ChannelStatus = 'disconnected';
private bot: Bot | null = null;
private messageHandler?: (msg: InboundMessage) => void;
private config: TelegramAdapterConfig;
private botInfo?: { id: number; username?: string };
get status(): ChannelStatus {
return this._status;
}
constructor(config: TelegramAdapterConfig) {
this.config = config;
}
/** Download a file from Telegram API and convert to base64. */
private async downloadFileToBase64(fileId: string): Promise<string | null> {
try {
const file = await this.bot?.api.getFile(fileId);
if (!file || !file.file_path) return null;
const token = this.config.botToken;
const url = `https://api.telegram.org/file/bot${token}/${file.file_path}`;
const response = await fetch(url);
if (!response.ok) return null;
const buffer = Buffer.from(await response.arrayBuffer());
return buffer.toString('base64');
} catch (error) {
console.error(`Failed to download file ${fileId}:`, error);
return null;
}
}
/** Register the inbound message handler. Called by the registry before connect(). */
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/** Create the grammy bot, wire up middleware & handlers, and start long-polling. */
async connect(): Promise<void> {
this.bot = new Bot(this.config.botToken);
this._status = 'connecting';
// ── Auth middleware — reject messages from unknown chats ──
this.bot.use(async (ctx, next) => {
const chatId = ctx.chat?.id;
if (chatId === undefined || !isAllowedChat(chatId, this.config.allowedChatIds)) {
console.log(`Rejected message from unauthorized chat: ${chatId}`);
return;
}
await next();
});
// ── Confirmation callback handler (requires hookEngine) ──
if (this.config.hookEngine) {
const hookEngine = this.config.hookEngine;
this.bot.on('callback_query:data', async (ctx) => {
const data = ctx.callbackQuery.data;
const parsed = parseConfirmationCallback(data);
if (!parsed) {
await ctx.answerCallbackQuery({ text: 'Invalid action' });
return;
}
const resolved = hookEngine.resolveConfirmation(parsed.id, {
approved: parsed.approved,
reason: parsed.approved ? undefined : 'Denied by user',
});
if (resolved) {
await ctx.answerCallbackQuery({
text: parsed.approved ? '✅ Approved' : '❌ Denied',
});
await ctx.editMessageText(
ctx.callbackQuery.message?.text + `\n\n${parsed.approved ? '✅ Approved' : '❌ Denied'}`,
{ parse_mode: 'Markdown' },
);
} else {
await ctx.answerCallbackQuery({ text: 'Confirmation expired or not found' });
}
});
}
// ── Command handlers ──
this.bot.command('start', async (ctx) => {
await ctx.reply('Flynn is ready. Send me a message!');
});
this.bot.command('reset', async (ctx) => {
// Deliver a special reset message through the channel
if (this.messageHandler) {
this.messageHandler({
id: String(ctx.message?.message_id ?? Date.now()),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text: '/reset',
timestamp: Date.now(),
metadata: { isCommand: true, command: 'reset' },
});
}
await ctx.reply('Conversation reset.');
});
// ── Text message handler ──
this.bot.on('message:text', async (ctx) => {
if (!this.messageHandler) return;
// Group chat mention gating
const isGroup = ctx.chat.type === 'group' || ctx.chat.type === 'supergroup';
const requireMention = this.config.requireMention ?? true;
if (isGroup && requireMention && this.botInfo) {
const rawText = ctx.message.text;
const username = this.botInfo.username;
// Check for @bot_username mention
const isMentioned = username
? rawText.includes(`@${username}`)
: false;
// Also allow replies to bot messages
const isReplyToBot = ctx.message.reply_to_message?.from?.id === this.botInfo.id;
if (!isMentioned && !isReplyToBot) {
return;
}
}
let text = ctx.message.text;
// Strip bot mention from text
if (isGroup && this.botInfo?.username) {
text = text.replace(new RegExp(`@${this.botInfo.username}\\b`, 'g'), '').trim();
}
// Show typing indicator while processing
await ctx.replyWithChatAction('typing');
this.messageHandler({
id: String(ctx.message.message_id),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text,
timestamp: Date.now(),
});
});
// ── Photo message handler ──
this.bot.on('message:photo', async (ctx) => {
if (!this.messageHandler) return;
const photo = ctx.message.photo;
if (!photo || photo.length === 0) return;
const largestPhoto = photo[photo.length - 1];
await ctx.replyWithChatAction('typing');
const imageData = await this.downloadFileToBase64(largestPhoto.file_id);
if (!imageData) {
console.error(`Failed to download photo ${largestPhoto.file_id}`);
return;
}
const caption = ctx.message.caption ?? '';
this.messageHandler({
id: String(ctx.message.message_id),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text: caption,
attachments: [
{
mimeType: 'image/jpeg',
data: imageData,
filename: `photo_${largestPhoto.file_unique_id}.jpg`,
size: largestPhoto.file_size,
},
],
timestamp: Date.now(),
});
});
// ── Image document handler ──
this.bot.on('message:document', async (ctx) => {
if (!this.messageHandler) return;
const document = ctx.message.document;
if (!document) return;
const mimeType = document.mime_type ?? '';
if (!mimeType.startsWith('image/')) return;
await ctx.replyWithChatAction('typing');
const fileData = await this.downloadFileToBase64(document.file_id);
if (!fileData) {
console.error(`Failed to download document ${document.file_id}`);
return;
}
const caption = ctx.message.caption ?? '';
const filename = document.file_name ?? document.file_unique_id;
this.messageHandler({
id: String(ctx.message.message_id),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text: caption,
attachments: [
{
mimeType,
data: fileData,
filename,
size: document.file_size,
},
],
timestamp: Date.now(),
});
});
// ── Voice message handler ──
this.bot.on('message:voice', async (ctx) => {
if (!this.messageHandler) return;
const voice = ctx.message.voice;
if (!voice) return;
await ctx.replyWithChatAction('typing');
const fileData = await this.downloadFileToBase64(voice.file_id);
if (!fileData) {
console.error(`Failed to download voice message ${voice.file_id}`);
return;
}
const caption = ctx.message.caption ?? '';
const mimeType = voice.mime_type ?? 'audio/ogg';
this.messageHandler({
id: String(ctx.message.message_id),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text: caption,
attachments: [
{
mimeType,
data: fileData,
filename: `voice_${voice.file_unique_id}.ogg`,
size: voice.file_size,
},
],
timestamp: Date.now(),
});
});
// ── Audio message handler ──
this.bot.on('message:audio', async (ctx) => {
if (!this.messageHandler) return;
const audio = ctx.message.audio;
if (!audio) return;
await ctx.replyWithChatAction('typing');
const fileData = await this.downloadFileToBase64(audio.file_id);
if (!fileData) {
console.error(`Failed to download audio message ${audio.file_id}`);
return;
}
const caption = ctx.message.caption ?? '';
const mimeType = audio.mime_type ?? 'audio/mpeg';
this.messageHandler({
id: String(ctx.message.message_id),
channel: 'telegram',
senderId: String(ctx.chat.id),
senderName: ctx.from?.first_name,
text: caption,
attachments: [
{
mimeType,
data: fileData,
filename: `audio_${audio.file_unique_id}.${mimeType.split('/')[1]}`,
size: audio.file_size,
},
],
timestamp: Date.now(),
});
});
// ── Start long polling ──
this.bot.start({
onStart: (botInfo) => {
console.log(`Telegram bot started: @${botInfo.username}`);
this.botInfo = { id: botInfo.id, username: botInfo.username };
this._status = 'connected';
},
});
// bot.start() returns immediately for long polling.
// The onStart callback sets connected above; also set here for safety
// in case the callback fires before this line is reached.
this._status = 'connected';
}
/** Stop the bot and clean up. */
async disconnect(): Promise<void> {
if (this.bot) {
await this.bot.stop();
this.bot = null;
}
this._status = 'disconnected';
}
/** Send an outbound message, automatically chunking if it exceeds Telegram's limit. */
async send(peerId: string, message: OutboundMessage): Promise<void> {
if (!this.bot) throw new Error('Telegram adapter not connected');
const chatId = Number(peerId);
const text = message.text;
// Telegram enforces a 4096-character limit per message
if (text.length <= 4096) {
await this.bot.api.sendMessage(chatId, text, { parse_mode: 'Markdown' });
} else {
const chunks = splitMessage(text, 4096);
for (const chunk of chunks) {
await this.bot.api.sendMessage(chatId, chunk, { parse_mode: 'Markdown' });
}
}
// Send outbound attachments after text
if (message.attachments && message.attachments.length > 0) {
for (const attachment of message.attachments) {
await this.sendAttachment(chatId, attachment);
}
}
}
/** Send a single outbound attachment via the Telegram API. */
private async sendAttachment(chatId: number, attachment: OutboundAttachment): Promise<void> {
if (!this.bot) return;
try {
const file = attachment.data
? new InputFile(Buffer.from(attachment.data, 'base64'), attachment.filename)
: attachment.url ?? '';
if (attachment.mimeType.startsWith('image/')) {
await this.bot.api.sendPhoto(chatId, file);
} else {
await this.bot.api.sendDocument(chatId, file);
}
} catch (error) {
console.error(
`Failed to send ${attachment.mimeType} attachment to ${chatId}:`,
error instanceof Error ? error.message : 'Unknown error',
);
}
}
}