From 108641415f920fca4407e36a3fb98b235d7da433 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Tue, 17 Feb 2026 10:45:31 -0800 Subject: [PATCH] feat(channels): share line and zalo binary attachments via minio --- README.md | 4 + config/default.yaml | 2 + docs/plans/state.json | 17 ++++ src/channels/line/adapter.test.ts | 42 ++++++++++ src/channels/line/adapter.ts | 126 +++++++++++++++++++++++++++++- src/channels/zalo/adapter.test.ts | 41 ++++++++++ src/channels/zalo/adapter.ts | 126 +++++++++++++++++++++++++++++- src/daemon/channels.ts | 31 ++++++++ 8 files changed, 385 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index c9b093a..8a09ab4 100644 --- a/README.md +++ b/README.md @@ -214,6 +214,8 @@ line: allowed_source_ids: [] # Empty = allow all users/groups/rooms require_mention: true mention_name: "flynn" +# Binary attachments: when backup.minio.enabled is configured, Flynn uploads +# LINE binary attachments to MinIO and sends a share URL automatically. # LINE webhook endpoint should point to: # POST https:///line/events @@ -239,6 +241,8 @@ zalo: require_mention: true mention_name: "flynn" endpoint: "https://openapi.zalo.me" +# Binary attachments: when backup.minio.enabled is configured, Flynn uploads +# Zalo binary attachments to MinIO and sends a share URL automatically. # Zalo webhook endpoint should point to: # POST https:///zalo/events diff --git a/config/default.yaml b/config/default.yaml index 0540459..836c5f3 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -392,6 +392,8 @@ hooks: # ── Backup ────────────────────────────────────────────────────────── # Snapshot sessions.db, vectors.db (optional), and memory/ into a tarball. # If MinIO is enabled, upload with `mc` using ephemeral credentials. +# LINE/Zalo adapters also reuse backup.minio credentials for binary-attachment +# URL sharing when those channels are configured. # # backup: # enabled: false diff --git a/docs/plans/state.json b/docs/plans/state.json index 8ba6887..42cc9e1 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -3695,6 +3695,23 @@ "docs/plans/state.json" ], "test_status": "pnpm test:run src/backends/external.test.ts + pnpm typecheck passing" + }, + "line-zalo-minio-binary-sharing": { + "status": "completed", + "date": "2026-02-17", + "updated": "2026-02-17", + "summary": "Implemented MinIO-backed binary attachment sharing for LINE and Zalo channels: when `backup.minio` is enabled, binary payloads are uploaded via `mc` and returned as share URLs to recipients; if unavailable, adapters keep explicit in-chat fallback notices.", + "files_modified": [ + "src/channels/line/adapter.ts", + "src/channels/line/adapter.test.ts", + "src/channels/zalo/adapter.ts", + "src/channels/zalo/adapter.test.ts", + "src/daemon/channels.ts", + "README.md", + "config/default.yaml", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/channels/line/adapter.test.ts src/channels/zalo/adapter.test.ts src/daemon/channels.test.ts + pnpm typecheck passing" } }, "overall_progress": { diff --git a/src/channels/line/adapter.test.ts b/src/channels/line/adapter.test.ts index fb65de8..05f468e 100644 --- a/src/channels/line/adapter.test.ts +++ b/src/channels/line/adapter.test.ts @@ -104,6 +104,48 @@ describe('LineAdapter', () => { warnSpy.mockRestore(); }); + it('uploads binary attachments to MinIO and sends share URL when configured', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const adapter = new LineAdapter({ + channelAccessToken: 'token', + channelSecret: 'secret', + minio: { + enabled: true, + endpoint: 'localhost:9000', + accessKey: 'minio', + secretKey: 'secret', + bucket: 'flynn', + prefix: 'channels/line', + secure: false, + }, + minioExecRunner: vi.fn(async (_file, args) => { + if (args[0] === 'share') { + return { stdout: '{"share":"https://minio.local/share/file.png"}\n', stderr: '' }; + } + return { stdout: '', stderr: '' }; + }), + }); + await adapter.connect(); + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => '', + } as Response); + + await adapter.send('U123', { + text: '', + attachments: [ + { mimeType: 'image/png', data: 'aGVsbG8=', filename: 'file.png' }, + ], + }); + + expect(mockFetch).toHaveBeenCalledTimes(1); + const body = JSON.parse(String(mockFetch.mock.calls[0]?.[1]?.body ?? '{}')); + expect(body.messages?.[0]?.text).toBe('file.png: https://minio.local/share/file.png'); + expect(warnSpy).not.toHaveBeenCalled(); + warnSpy.mockRestore(); + }); + it('send delivers URL attachment even when text is empty', async () => { const adapter = new LineAdapter({ channelAccessToken: 'token', diff --git a/src/channels/line/adapter.ts b/src/channels/line/adapter.ts index dad795c..e0baed2 100644 --- a/src/channels/line/adapter.ts +++ b/src/channels/line/adapter.ts @@ -1,5 +1,10 @@ import { createHmac, timingSafeEqual } from 'crypto'; +import { execFile } from 'node:child_process'; +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; import type { IncomingMessage, ServerResponse } from 'http'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { promisify } from 'node:util'; import type { InboundMessage, @@ -9,6 +14,23 @@ import type { } from '../types.js'; import { shouldIgnoreForMissingMention, splitMessage } from '../utils.js'; import { readRequestBody } from '../../utils/httpBody.js'; +import { backupInternals } from '../../backup/index.js'; + +const execFileAsync = promisify(execFile); + +type ExecRunner = (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => Promise<{ stdout: string; stderr: string }>; + +export interface MinioShareConfig { + enabled: boolean; + endpoint: string; + accessKey: string; + secretKey: string; + bucket: string; + prefix?: string; + secure?: boolean; + expires?: string; + mcPath?: string; +} export interface LineAdapterConfig { channelAccessToken: string; @@ -16,6 +38,8 @@ export interface LineAdapterConfig { allowedSourceIds?: string[]; requireMention?: boolean; mentionName?: string; + minio?: MinioShareConfig; + minioExecRunner?: ExecRunner; } interface LineWebhookBody { @@ -89,12 +113,64 @@ export class LineAdapter implements ChannelAdapter { continue; } if (attachment.data) { - console.warn(`LINE: skipping attachment data (${attachment.mimeType}) — upload not implemented`); - await this.sendPush(peerId, formatBinaryAttachmentNotice('LINE', attachment.filename, attachment.mimeType)); + const minioUrl = await this.uploadBinaryAttachmentToMinio(attachment.data, attachment.filename, attachment.mimeType); + if (minioUrl) { + const line = attachment.filename ? `${attachment.filename}: ${minioUrl}` : minioUrl; + await this.sendPush(peerId, line); + } else { + console.warn(`LINE: skipping attachment data (${attachment.mimeType}) — upload not implemented`); + await this.sendPush(peerId, formatBinaryAttachmentNotice('LINE', attachment.filename, attachment.mimeType)); + } } } } + private async uploadBinaryAttachmentToMinio(base64Data: string, filename?: string, mimeType?: string): Promise { + const minio = this.config.minio; + if (!minio?.enabled || !minio.endpoint || !minio.accessKey || !minio.secretKey || !minio.bucket) { + return null; + } + + const tempDir = await mkdtemp(join(tmpdir(), 'flynn-line-')); + const safeName = sanitizeFilename(filename) || inferFilenameFromMimeType(mimeType); + const localPath = join(tempDir, `${Date.now()}_${safeName}`); + const alias = 'flynnline'; + const objectKey = backupInternals.buildObjectKey(minio.prefix ?? 'channels/line', `${Date.now()}_${safeName}`); + const remotePath = `${alias}/${minio.bucket}/${objectKey}`; + const host = backupInternals.buildMinioHost({ + endpoint: minio.endpoint, + accessKey: minio.accessKey, + secretKey: minio.secretKey, + secure: minio.secure ?? true, + }); + const env = { ...process.env, [`MC_HOST_${alias}`]: host }; + const runner = this.config.minioExecRunner ?? (async (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => { + return execFileAsync(file, args, options); + }); + + try { + await writeFile(localPath, Buffer.from(base64Data, 'base64')); + await runner(minio.mcPath ?? 'mc', ['mb', '--ignore-existing', `${alias}/${minio.bucket}`], { env }); + await runner(minio.mcPath ?? 'mc', ['cp', localPath, remotePath], { env }); + const { stdout } = await runner( + minio.mcPath ?? 'mc', + ['share', 'download', '--json', '--expire', minio.expires ?? '24h', remotePath], + { env }, + ); + const shareUrl = parseShareUrl(typeof stdout === 'string' ? stdout : stdout.toString('utf-8')); + if (!shareUrl) { + return null; + } + return shareUrl; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`LINE: MinIO upload failed (${message})`); + return null; + } finally { + await rm(tempDir, { recursive: true, force: true }); + } + } + async handleRequest(req: IncomingMessage, res: ServerResponse): Promise { let body = ''; try { @@ -247,3 +323,49 @@ function formatBinaryAttachmentNotice(channel: string, filename?: string, mimeTy const type = mimeType || 'application/octet-stream'; return `[${channel}] Binary attachment not uploaded yet: ${name} (${type}).`; } + +function sanitizeFilename(filename?: string): string { + if (!filename) { + return ''; + } + return filename.replace(/[^a-zA-Z0-9._-]/g, '_'); +} + +function inferFilenameFromMimeType(mimeType?: string): string { + if (!mimeType) { + return 'attachment.bin'; + } + if (mimeType.startsWith('image/jpeg')) { + return 'attachment.jpg'; + } + if (mimeType.startsWith('image/png')) { + return 'attachment.png'; + } + if (mimeType.startsWith('application/pdf')) { + return 'attachment.pdf'; + } + const [, subtype] = mimeType.split('/'); + if (!subtype) { + return 'attachment.bin'; + } + return `attachment.${subtype.split('+')[0]}`; +} + +function parseShareUrl(stdout: string): string | null { + const lines = stdout.split('\n').map((line) => line.trim()).filter(Boolean); + for (const line of lines) { + try { + const parsed = JSON.parse(line) as Record; + const share = typeof parsed.share === 'string' ? parsed.share : undefined; + const url = typeof parsed.url === 'string' ? parsed.url : undefined; + if (share) {return share;} + if (url) {return url;} + } catch { + const match = line.match(/https?:\/\/\S+/); + if (match) { + return match[0]; + } + } + } + return null; +} diff --git a/src/channels/zalo/adapter.test.ts b/src/channels/zalo/adapter.test.ts index 346d434..b4de759 100644 --- a/src/channels/zalo/adapter.test.ts +++ b/src/channels/zalo/adapter.test.ts @@ -90,6 +90,47 @@ describe('ZaloAdapter', () => { warnSpy.mockRestore(); }); + it('uploads binary attachments to MinIO and sends share URL when configured', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + const adapter = new ZaloAdapter({ + oaAccessToken: 'token', + minio: { + enabled: true, + endpoint: 'localhost:9000', + accessKey: 'minio', + secretKey: 'secret', + bucket: 'flynn', + prefix: 'channels/zalo', + secure: false, + }, + minioExecRunner: vi.fn(async (_file, args) => { + if (args[0] === 'share') { + return { stdout: '{"share":"https://minio.local/share/file.pdf"}\n', stderr: '' }; + } + return { stdout: '', stderr: '' }; + }), + }); + await adapter.connect(); + mockFetch.mockResolvedValue({ + ok: true, + status: 200, + text: async () => '', + } as Response); + + await adapter.send('uid-1', { + text: '', + attachments: [ + { mimeType: 'application/pdf', data: 'aGVsbG8=', filename: 'file.pdf' }, + ], + }); + + expect(mockFetch).toHaveBeenCalledTimes(1); + const body = JSON.parse(String(mockFetch.mock.calls[0]?.[1]?.body ?? '{}')); + expect(body.message?.text).toBe('file.pdf: https://minio.local/share/file.pdf'); + expect(warnSpy).not.toHaveBeenCalled(); + warnSpy.mockRestore(); + }); + it('send delivers URL attachment even when text is empty', async () => { const adapter = new ZaloAdapter({ oaAccessToken: 'token' }); await adapter.connect(); diff --git a/src/channels/zalo/adapter.ts b/src/channels/zalo/adapter.ts index de0471f..080be27 100644 --- a/src/channels/zalo/adapter.ts +++ b/src/channels/zalo/adapter.ts @@ -1,4 +1,9 @@ +import { execFile } from 'node:child_process'; +import { mkdtemp, rm, writeFile } from 'node:fs/promises'; import type { IncomingMessage, ServerResponse } from 'http'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { promisify } from 'node:util'; import type { InboundMessage, @@ -8,6 +13,23 @@ import type { } from '../types.js'; import { shouldIgnoreForMissingMention, splitMessage } from '../utils.js'; import { readRequestBody } from '../../utils/httpBody.js'; +import { backupInternals } from '../../backup/index.js'; + +const execFileAsync = promisify(execFile); + +type ExecRunner = (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => Promise<{ stdout: string; stderr: string }>; + +export interface MinioShareConfig { + enabled: boolean; + endpoint: string; + accessKey: string; + secretKey: string; + bucket: string; + prefix?: string; + secure?: boolean; + expires?: string; + mcPath?: string; +} export interface ZaloAdapterConfig { oaAccessToken: string; @@ -16,6 +38,8 @@ export interface ZaloAdapterConfig { allowedUserIds?: string[]; requireMention?: boolean; mentionName?: string; + minio?: MinioShareConfig; + minioExecRunner?: ExecRunner; } interface ZaloWebhookEvent { @@ -79,12 +103,64 @@ export class ZaloAdapter implements ChannelAdapter { continue; } if (attachment.data) { - console.warn(`Zalo: skipping attachment data (${attachment.mimeType}) — upload not implemented`); - await this.sendText(peerId, formatBinaryAttachmentNotice('Zalo', attachment.filename, attachment.mimeType)); + const minioUrl = await this.uploadBinaryAttachmentToMinio(attachment.data, attachment.filename, attachment.mimeType); + if (minioUrl) { + const line = attachment.filename ? `${attachment.filename}: ${minioUrl}` : minioUrl; + await this.sendText(peerId, line); + } else { + console.warn(`Zalo: skipping attachment data (${attachment.mimeType}) — upload not implemented`); + await this.sendText(peerId, formatBinaryAttachmentNotice('Zalo', attachment.filename, attachment.mimeType)); + } } } } + private async uploadBinaryAttachmentToMinio(base64Data: string, filename?: string, mimeType?: string): Promise { + const minio = this.config.minio; + if (!minio?.enabled || !minio.endpoint || !minio.accessKey || !minio.secretKey || !minio.bucket) { + return null; + } + + const tempDir = await mkdtemp(join(tmpdir(), 'flynn-zalo-')); + const safeName = sanitizeFilename(filename) || inferFilenameFromMimeType(mimeType); + const localPath = join(tempDir, `${Date.now()}_${safeName}`); + const alias = 'flynnzalo'; + const objectKey = backupInternals.buildObjectKey(minio.prefix ?? 'channels/zalo', `${Date.now()}_${safeName}`); + const remotePath = `${alias}/${minio.bucket}/${objectKey}`; + const host = backupInternals.buildMinioHost({ + endpoint: minio.endpoint, + accessKey: minio.accessKey, + secretKey: minio.secretKey, + secure: minio.secure ?? true, + }); + const env = { ...process.env, [`MC_HOST_${alias}`]: host }; + const runner = this.config.minioExecRunner ?? (async (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => { + return execFileAsync(file, args, options); + }); + + try { + await writeFile(localPath, Buffer.from(base64Data, 'base64')); + await runner(minio.mcPath ?? 'mc', ['mb', '--ignore-existing', `${alias}/${minio.bucket}`], { env }); + await runner(minio.mcPath ?? 'mc', ['cp', localPath, remotePath], { env }); + const { stdout } = await runner( + minio.mcPath ?? 'mc', + ['share', 'download', '--json', '--expire', minio.expires ?? '24h', remotePath], + { env }, + ); + const shareUrl = parseShareUrl(typeof stdout === 'string' ? stdout : stdout.toString('utf-8')); + if (!shareUrl) { + return null; + } + return shareUrl; + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`Zalo: MinIO upload failed (${message})`); + return null; + } finally { + await rm(tempDir, { recursive: true, force: true }); + } + } + async handleRequest(req: IncomingMessage, res: ServerResponse): Promise { let body = ''; try { @@ -196,3 +272,49 @@ function formatBinaryAttachmentNotice(channel: string, filename?: string, mimeTy const type = mimeType || 'application/octet-stream'; return `[${channel}] Binary attachment not uploaded yet: ${name} (${type}).`; } + +function sanitizeFilename(filename?: string): string { + if (!filename) { + return ''; + } + return filename.replace(/[^a-zA-Z0-9._-]/g, '_'); +} + +function inferFilenameFromMimeType(mimeType?: string): string { + if (!mimeType) { + return 'attachment.bin'; + } + if (mimeType.startsWith('image/jpeg')) { + return 'attachment.jpg'; + } + if (mimeType.startsWith('image/png')) { + return 'attachment.png'; + } + if (mimeType.startsWith('application/pdf')) { + return 'attachment.pdf'; + } + const [, subtype] = mimeType.split('/'); + if (!subtype) { + return 'attachment.bin'; + } + return `attachment.${subtype.split('+')[0]}`; +} + +function parseShareUrl(stdout: string): string | null { + const lines = stdout.split('\n').map((line) => line.trim()).filter(Boolean); + for (const line of lines) { + try { + const parsed = JSON.parse(line) as Record; + const share = typeof parsed.share === 'string' ? parsed.share : undefined; + const url = typeof parsed.url === 'string' ? parsed.url : undefined; + if (share) {return share;} + if (url) {return url;} + } catch { + const match = line.match(/https?:\/\/\S+/); + if (match) { + return match[0]; + } + } + } + return null; +} diff --git a/src/daemon/channels.ts b/src/daemon/channels.ts index 5fa5029..7c7052a 100644 --- a/src/daemon/channels.ts +++ b/src/daemon/channels.ts @@ -18,8 +18,33 @@ export interface ChannelsResult { gmailWatcher?: GmailWatcher; } +function resolveChannelMinioShare(config: Config): { + enabled: boolean; + endpoint: string; + accessKey: string; + secretKey: string; + bucket: string; + prefix: string; + secure: boolean; +} | undefined { + const minio = config.backup.minio; + if (!minio.enabled || !minio.endpoint || !minio.access_key || !minio.secret_key || !minio.bucket) { + return undefined; + } + return { + enabled: true, + endpoint: minio.endpoint, + accessKey: minio.access_key, + secretKey: minio.secret_key, + bucket: minio.bucket, + prefix: minio.prefix, + secure: minio.secure, + }; +} + export function registerChannels(deps: ChannelsDeps): ChannelsResult { const { config, channelRegistry, hookEngine, pairingManager, gateway } = deps; + const channelMinioShare = resolveChannelMinioShare(config); // Register Telegram adapter (if configured) if (config.telegram) { @@ -162,6 +187,9 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult { allowedSourceIds: config.line.allowed_source_ids.length > 0 ? config.line.allowed_source_ids : undefined, requireMention: config.line.require_mention, mentionName: config.line.mention_name, + minio: channelMinioShare + ? { ...channelMinioShare, prefix: `${channelMinioShare.prefix.replace(/\/+$/, '')}/channels/line` } + : undefined, }); channelRegistry.register(lineAdapter); gateway.setLineHandler(lineAdapter); @@ -191,6 +219,9 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult { allowedUserIds: config.zalo.allowed_user_ids.length > 0 ? config.zalo.allowed_user_ids : undefined, requireMention: config.zalo.require_mention, mentionName: config.zalo.mention_name, + minio: channelMinioShare + ? { ...channelMinioShare, prefix: `${channelMinioShare.prefix.replace(/\/+$/, '')}/channels/zalo` } + : undefined, }); channelRegistry.register(zaloAdapter); gateway.setZaloHandler(zaloAdapter);