feat(channels): share line and zalo binary attachments via minio
This commit is contained in:
@@ -104,6 +104,48 @@ describe('LineAdapter', () => {
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('uploads binary attachments to MinIO and sends share URL when configured', async () => {
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
const adapter = new LineAdapter({
|
||||
channelAccessToken: 'token',
|
||||
channelSecret: 'secret',
|
||||
minio: {
|
||||
enabled: true,
|
||||
endpoint: 'localhost:9000',
|
||||
accessKey: 'minio',
|
||||
secretKey: 'secret',
|
||||
bucket: 'flynn',
|
||||
prefix: 'channels/line',
|
||||
secure: false,
|
||||
},
|
||||
minioExecRunner: vi.fn(async (_file, args) => {
|
||||
if (args[0] === 'share') {
|
||||
return { stdout: '{"share":"https://minio.local/share/file.png"}\n', stderr: '' };
|
||||
}
|
||||
return { stdout: '', stderr: '' };
|
||||
}),
|
||||
});
|
||||
await adapter.connect();
|
||||
mockFetch.mockResolvedValue({
|
||||
ok: true,
|
||||
status: 200,
|
||||
text: async () => '',
|
||||
} as Response);
|
||||
|
||||
await adapter.send('U123', {
|
||||
text: '',
|
||||
attachments: [
|
||||
{ mimeType: 'image/png', data: 'aGVsbG8=', filename: 'file.png' },
|
||||
],
|
||||
});
|
||||
|
||||
expect(mockFetch).toHaveBeenCalledTimes(1);
|
||||
const body = JSON.parse(String(mockFetch.mock.calls[0]?.[1]?.body ?? '{}'));
|
||||
expect(body.messages?.[0]?.text).toBe('file.png: https://minio.local/share/file.png');
|
||||
expect(warnSpy).not.toHaveBeenCalled();
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('send delivers URL attachment even when text is empty', async () => {
|
||||
const adapter = new LineAdapter({
|
||||
channelAccessToken: 'token',
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
import { createHmac, timingSafeEqual } from 'crypto';
|
||||
import { execFile } from 'node:child_process';
|
||||
import { mkdtemp, rm, writeFile } from 'node:fs/promises';
|
||||
import type { IncomingMessage, ServerResponse } from 'http';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { promisify } from 'node:util';
|
||||
|
||||
import type {
|
||||
InboundMessage,
|
||||
@@ -9,6 +14,23 @@ import type {
|
||||
} from '../types.js';
|
||||
import { shouldIgnoreForMissingMention, splitMessage } from '../utils.js';
|
||||
import { readRequestBody } from '../../utils/httpBody.js';
|
||||
import { backupInternals } from '../../backup/index.js';
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
type ExecRunner = (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => Promise<{ stdout: string; stderr: string }>;
|
||||
|
||||
export interface MinioShareConfig {
|
||||
enabled: boolean;
|
||||
endpoint: string;
|
||||
accessKey: string;
|
||||
secretKey: string;
|
||||
bucket: string;
|
||||
prefix?: string;
|
||||
secure?: boolean;
|
||||
expires?: string;
|
||||
mcPath?: string;
|
||||
}
|
||||
|
||||
export interface LineAdapterConfig {
|
||||
channelAccessToken: string;
|
||||
@@ -16,6 +38,8 @@ export interface LineAdapterConfig {
|
||||
allowedSourceIds?: string[];
|
||||
requireMention?: boolean;
|
||||
mentionName?: string;
|
||||
minio?: MinioShareConfig;
|
||||
minioExecRunner?: ExecRunner;
|
||||
}
|
||||
|
||||
interface LineWebhookBody {
|
||||
@@ -89,12 +113,64 @@ export class LineAdapter implements ChannelAdapter {
|
||||
continue;
|
||||
}
|
||||
if (attachment.data) {
|
||||
console.warn(`LINE: skipping attachment data (${attachment.mimeType}) — upload not implemented`);
|
||||
await this.sendPush(peerId, formatBinaryAttachmentNotice('LINE', attachment.filename, attachment.mimeType));
|
||||
const minioUrl = await this.uploadBinaryAttachmentToMinio(attachment.data, attachment.filename, attachment.mimeType);
|
||||
if (minioUrl) {
|
||||
const line = attachment.filename ? `${attachment.filename}: ${minioUrl}` : minioUrl;
|
||||
await this.sendPush(peerId, line);
|
||||
} else {
|
||||
console.warn(`LINE: skipping attachment data (${attachment.mimeType}) — upload not implemented`);
|
||||
await this.sendPush(peerId, formatBinaryAttachmentNotice('LINE', attachment.filename, attachment.mimeType));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async uploadBinaryAttachmentToMinio(base64Data: string, filename?: string, mimeType?: string): Promise<string | null> {
|
||||
const minio = this.config.minio;
|
||||
if (!minio?.enabled || !minio.endpoint || !minio.accessKey || !minio.secretKey || !minio.bucket) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const tempDir = await mkdtemp(join(tmpdir(), 'flynn-line-'));
|
||||
const safeName = sanitizeFilename(filename) || inferFilenameFromMimeType(mimeType);
|
||||
const localPath = join(tempDir, `${Date.now()}_${safeName}`);
|
||||
const alias = 'flynnline';
|
||||
const objectKey = backupInternals.buildObjectKey(minio.prefix ?? 'channels/line', `${Date.now()}_${safeName}`);
|
||||
const remotePath = `${alias}/${minio.bucket}/${objectKey}`;
|
||||
const host = backupInternals.buildMinioHost({
|
||||
endpoint: minio.endpoint,
|
||||
accessKey: minio.accessKey,
|
||||
secretKey: minio.secretKey,
|
||||
secure: minio.secure ?? true,
|
||||
});
|
||||
const env = { ...process.env, [`MC_HOST_${alias}`]: host };
|
||||
const runner = this.config.minioExecRunner ?? (async (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => {
|
||||
return execFileAsync(file, args, options);
|
||||
});
|
||||
|
||||
try {
|
||||
await writeFile(localPath, Buffer.from(base64Data, 'base64'));
|
||||
await runner(minio.mcPath ?? 'mc', ['mb', '--ignore-existing', `${alias}/${minio.bucket}`], { env });
|
||||
await runner(minio.mcPath ?? 'mc', ['cp', localPath, remotePath], { env });
|
||||
const { stdout } = await runner(
|
||||
minio.mcPath ?? 'mc',
|
||||
['share', 'download', '--json', '--expire', minio.expires ?? '24h', remotePath],
|
||||
{ env },
|
||||
);
|
||||
const shareUrl = parseShareUrl(typeof stdout === 'string' ? stdout : stdout.toString('utf-8'));
|
||||
if (!shareUrl) {
|
||||
return null;
|
||||
}
|
||||
return shareUrl;
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.warn(`LINE: MinIO upload failed (${message})`);
|
||||
return null;
|
||||
} finally {
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
|
||||
let body = '';
|
||||
try {
|
||||
@@ -247,3 +323,49 @@ function formatBinaryAttachmentNotice(channel: string, filename?: string, mimeTy
|
||||
const type = mimeType || 'application/octet-stream';
|
||||
return `[${channel}] Binary attachment not uploaded yet: ${name} (${type}).`;
|
||||
}
|
||||
|
||||
function sanitizeFilename(filename?: string): string {
|
||||
if (!filename) {
|
||||
return '';
|
||||
}
|
||||
return filename.replace(/[^a-zA-Z0-9._-]/g, '_');
|
||||
}
|
||||
|
||||
function inferFilenameFromMimeType(mimeType?: string): string {
|
||||
if (!mimeType) {
|
||||
return 'attachment.bin';
|
||||
}
|
||||
if (mimeType.startsWith('image/jpeg')) {
|
||||
return 'attachment.jpg';
|
||||
}
|
||||
if (mimeType.startsWith('image/png')) {
|
||||
return 'attachment.png';
|
||||
}
|
||||
if (mimeType.startsWith('application/pdf')) {
|
||||
return 'attachment.pdf';
|
||||
}
|
||||
const [, subtype] = mimeType.split('/');
|
||||
if (!subtype) {
|
||||
return 'attachment.bin';
|
||||
}
|
||||
return `attachment.${subtype.split('+')[0]}`;
|
||||
}
|
||||
|
||||
function parseShareUrl(stdout: string): string | null {
|
||||
const lines = stdout.split('\n').map((line) => line.trim()).filter(Boolean);
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const parsed = JSON.parse(line) as Record<string, unknown>;
|
||||
const share = typeof parsed.share === 'string' ? parsed.share : undefined;
|
||||
const url = typeof parsed.url === 'string' ? parsed.url : undefined;
|
||||
if (share) {return share;}
|
||||
if (url) {return url;}
|
||||
} catch {
|
||||
const match = line.match(/https?:\/\/\S+/);
|
||||
if (match) {
|
||||
return match[0];
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -90,6 +90,47 @@ describe('ZaloAdapter', () => {
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('uploads binary attachments to MinIO and sends share URL when configured', async () => {
|
||||
const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {});
|
||||
const adapter = new ZaloAdapter({
|
||||
oaAccessToken: 'token',
|
||||
minio: {
|
||||
enabled: true,
|
||||
endpoint: 'localhost:9000',
|
||||
accessKey: 'minio',
|
||||
secretKey: 'secret',
|
||||
bucket: 'flynn',
|
||||
prefix: 'channels/zalo',
|
||||
secure: false,
|
||||
},
|
||||
minioExecRunner: vi.fn(async (_file, args) => {
|
||||
if (args[0] === 'share') {
|
||||
return { stdout: '{"share":"https://minio.local/share/file.pdf"}\n', stderr: '' };
|
||||
}
|
||||
return { stdout: '', stderr: '' };
|
||||
}),
|
||||
});
|
||||
await adapter.connect();
|
||||
mockFetch.mockResolvedValue({
|
||||
ok: true,
|
||||
status: 200,
|
||||
text: async () => '',
|
||||
} as Response);
|
||||
|
||||
await adapter.send('uid-1', {
|
||||
text: '',
|
||||
attachments: [
|
||||
{ mimeType: 'application/pdf', data: 'aGVsbG8=', filename: 'file.pdf' },
|
||||
],
|
||||
});
|
||||
|
||||
expect(mockFetch).toHaveBeenCalledTimes(1);
|
||||
const body = JSON.parse(String(mockFetch.mock.calls[0]?.[1]?.body ?? '{}'));
|
||||
expect(body.message?.text).toBe('file.pdf: https://minio.local/share/file.pdf');
|
||||
expect(warnSpy).not.toHaveBeenCalled();
|
||||
warnSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('send delivers URL attachment even when text is empty', async () => {
|
||||
const adapter = new ZaloAdapter({ oaAccessToken: 'token' });
|
||||
await adapter.connect();
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
import { execFile } from 'node:child_process';
|
||||
import { mkdtemp, rm, writeFile } from 'node:fs/promises';
|
||||
import type { IncomingMessage, ServerResponse } from 'http';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { promisify } from 'node:util';
|
||||
|
||||
import type {
|
||||
InboundMessage,
|
||||
@@ -8,6 +13,23 @@ import type {
|
||||
} from '../types.js';
|
||||
import { shouldIgnoreForMissingMention, splitMessage } from '../utils.js';
|
||||
import { readRequestBody } from '../../utils/httpBody.js';
|
||||
import { backupInternals } from '../../backup/index.js';
|
||||
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
type ExecRunner = (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => Promise<{ stdout: string; stderr: string }>;
|
||||
|
||||
export interface MinioShareConfig {
|
||||
enabled: boolean;
|
||||
endpoint: string;
|
||||
accessKey: string;
|
||||
secretKey: string;
|
||||
bucket: string;
|
||||
prefix?: string;
|
||||
secure?: boolean;
|
||||
expires?: string;
|
||||
mcPath?: string;
|
||||
}
|
||||
|
||||
export interface ZaloAdapterConfig {
|
||||
oaAccessToken: string;
|
||||
@@ -16,6 +38,8 @@ export interface ZaloAdapterConfig {
|
||||
allowedUserIds?: string[];
|
||||
requireMention?: boolean;
|
||||
mentionName?: string;
|
||||
minio?: MinioShareConfig;
|
||||
minioExecRunner?: ExecRunner;
|
||||
}
|
||||
|
||||
interface ZaloWebhookEvent {
|
||||
@@ -79,12 +103,64 @@ export class ZaloAdapter implements ChannelAdapter {
|
||||
continue;
|
||||
}
|
||||
if (attachment.data) {
|
||||
console.warn(`Zalo: skipping attachment data (${attachment.mimeType}) — upload not implemented`);
|
||||
await this.sendText(peerId, formatBinaryAttachmentNotice('Zalo', attachment.filename, attachment.mimeType));
|
||||
const minioUrl = await this.uploadBinaryAttachmentToMinio(attachment.data, attachment.filename, attachment.mimeType);
|
||||
if (minioUrl) {
|
||||
const line = attachment.filename ? `${attachment.filename}: ${minioUrl}` : minioUrl;
|
||||
await this.sendText(peerId, line);
|
||||
} else {
|
||||
console.warn(`Zalo: skipping attachment data (${attachment.mimeType}) — upload not implemented`);
|
||||
await this.sendText(peerId, formatBinaryAttachmentNotice('Zalo', attachment.filename, attachment.mimeType));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async uploadBinaryAttachmentToMinio(base64Data: string, filename?: string, mimeType?: string): Promise<string | null> {
|
||||
const minio = this.config.minio;
|
||||
if (!minio?.enabled || !minio.endpoint || !minio.accessKey || !minio.secretKey || !minio.bucket) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const tempDir = await mkdtemp(join(tmpdir(), 'flynn-zalo-'));
|
||||
const safeName = sanitizeFilename(filename) || inferFilenameFromMimeType(mimeType);
|
||||
const localPath = join(tempDir, `${Date.now()}_${safeName}`);
|
||||
const alias = 'flynnzalo';
|
||||
const objectKey = backupInternals.buildObjectKey(minio.prefix ?? 'channels/zalo', `${Date.now()}_${safeName}`);
|
||||
const remotePath = `${alias}/${minio.bucket}/${objectKey}`;
|
||||
const host = backupInternals.buildMinioHost({
|
||||
endpoint: minio.endpoint,
|
||||
accessKey: minio.accessKey,
|
||||
secretKey: minio.secretKey,
|
||||
secure: minio.secure ?? true,
|
||||
});
|
||||
const env = { ...process.env, [`MC_HOST_${alias}`]: host };
|
||||
const runner = this.config.minioExecRunner ?? (async (file: string, args: string[], options?: { env?: NodeJS.ProcessEnv }) => {
|
||||
return execFileAsync(file, args, options);
|
||||
});
|
||||
|
||||
try {
|
||||
await writeFile(localPath, Buffer.from(base64Data, 'base64'));
|
||||
await runner(minio.mcPath ?? 'mc', ['mb', '--ignore-existing', `${alias}/${minio.bucket}`], { env });
|
||||
await runner(minio.mcPath ?? 'mc', ['cp', localPath, remotePath], { env });
|
||||
const { stdout } = await runner(
|
||||
minio.mcPath ?? 'mc',
|
||||
['share', 'download', '--json', '--expire', minio.expires ?? '24h', remotePath],
|
||||
{ env },
|
||||
);
|
||||
const shareUrl = parseShareUrl(typeof stdout === 'string' ? stdout : stdout.toString('utf-8'));
|
||||
if (!shareUrl) {
|
||||
return null;
|
||||
}
|
||||
return shareUrl;
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
console.warn(`Zalo: MinIO upload failed (${message})`);
|
||||
return null;
|
||||
} finally {
|
||||
await rm(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
|
||||
let body = '';
|
||||
try {
|
||||
@@ -196,3 +272,49 @@ function formatBinaryAttachmentNotice(channel: string, filename?: string, mimeTy
|
||||
const type = mimeType || 'application/octet-stream';
|
||||
return `[${channel}] Binary attachment not uploaded yet: ${name} (${type}).`;
|
||||
}
|
||||
|
||||
function sanitizeFilename(filename?: string): string {
|
||||
if (!filename) {
|
||||
return '';
|
||||
}
|
||||
return filename.replace(/[^a-zA-Z0-9._-]/g, '_');
|
||||
}
|
||||
|
||||
function inferFilenameFromMimeType(mimeType?: string): string {
|
||||
if (!mimeType) {
|
||||
return 'attachment.bin';
|
||||
}
|
||||
if (mimeType.startsWith('image/jpeg')) {
|
||||
return 'attachment.jpg';
|
||||
}
|
||||
if (mimeType.startsWith('image/png')) {
|
||||
return 'attachment.png';
|
||||
}
|
||||
if (mimeType.startsWith('application/pdf')) {
|
||||
return 'attachment.pdf';
|
||||
}
|
||||
const [, subtype] = mimeType.split('/');
|
||||
if (!subtype) {
|
||||
return 'attachment.bin';
|
||||
}
|
||||
return `attachment.${subtype.split('+')[0]}`;
|
||||
}
|
||||
|
||||
function parseShareUrl(stdout: string): string | null {
|
||||
const lines = stdout.split('\n').map((line) => line.trim()).filter(Boolean);
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const parsed = JSON.parse(line) as Record<string, unknown>;
|
||||
const share = typeof parsed.share === 'string' ? parsed.share : undefined;
|
||||
const url = typeof parsed.url === 'string' ? parsed.url : undefined;
|
||||
if (share) {return share;}
|
||||
if (url) {return url;}
|
||||
} catch {
|
||||
const match = line.match(/https?:\/\/\S+/);
|
||||
if (match) {
|
||||
return match[0];
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -18,8 +18,33 @@ export interface ChannelsResult {
|
||||
gmailWatcher?: GmailWatcher;
|
||||
}
|
||||
|
||||
function resolveChannelMinioShare(config: Config): {
|
||||
enabled: boolean;
|
||||
endpoint: string;
|
||||
accessKey: string;
|
||||
secretKey: string;
|
||||
bucket: string;
|
||||
prefix: string;
|
||||
secure: boolean;
|
||||
} | undefined {
|
||||
const minio = config.backup.minio;
|
||||
if (!minio.enabled || !minio.endpoint || !minio.access_key || !minio.secret_key || !minio.bucket) {
|
||||
return undefined;
|
||||
}
|
||||
return {
|
||||
enabled: true,
|
||||
endpoint: minio.endpoint,
|
||||
accessKey: minio.access_key,
|
||||
secretKey: minio.secret_key,
|
||||
bucket: minio.bucket,
|
||||
prefix: minio.prefix,
|
||||
secure: minio.secure,
|
||||
};
|
||||
}
|
||||
|
||||
export function registerChannels(deps: ChannelsDeps): ChannelsResult {
|
||||
const { config, channelRegistry, hookEngine, pairingManager, gateway } = deps;
|
||||
const channelMinioShare = resolveChannelMinioShare(config);
|
||||
|
||||
// Register Telegram adapter (if configured)
|
||||
if (config.telegram) {
|
||||
@@ -162,6 +187,9 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult {
|
||||
allowedSourceIds: config.line.allowed_source_ids.length > 0 ? config.line.allowed_source_ids : undefined,
|
||||
requireMention: config.line.require_mention,
|
||||
mentionName: config.line.mention_name,
|
||||
minio: channelMinioShare
|
||||
? { ...channelMinioShare, prefix: `${channelMinioShare.prefix.replace(/\/+$/, '')}/channels/line` }
|
||||
: undefined,
|
||||
});
|
||||
channelRegistry.register(lineAdapter);
|
||||
gateway.setLineHandler(lineAdapter);
|
||||
@@ -191,6 +219,9 @@ export function registerChannels(deps: ChannelsDeps): ChannelsResult {
|
||||
allowedUserIds: config.zalo.allowed_user_ids.length > 0 ? config.zalo.allowed_user_ids : undefined,
|
||||
requireMention: config.zalo.require_mention,
|
||||
mentionName: config.zalo.mention_name,
|
||||
minio: channelMinioShare
|
||||
? { ...channelMinioShare, prefix: `${channelMinioShare.prefix.replace(/\/+$/, '')}/channels/zalo` }
|
||||
: undefined,
|
||||
});
|
||||
channelRegistry.register(zaloAdapter);
|
||||
gateway.setZaloHandler(zaloAdapter);
|
||||
|
||||
Reference in New Issue
Block a user