Add Zalo channel adapter with webhook and send path

This commit is contained in:
William Valentin
2026-02-16 13:11:51 -08:00
parent 891ccb696e
commit 8bed99c770
16 changed files with 491 additions and 6 deletions
+1
View File
@@ -23,4 +23,5 @@ export { GoogleChatAdapter, type GoogleChatAdapterConfig } from './googleChat/in
export { BlueBubblesAdapter, type BlueBubblesAdapterConfig } from './bluebubbles/index.js';
export { LineAdapter, type LineAdapterConfig } from './line/index.js';
export { FeishuAdapter, type FeishuAdapterConfig } from './feishu/index.js';
export { ZaloAdapter, type ZaloAdapterConfig } from './zalo/index.js';
export { PairingManager, type PairingConfig, type PairingStore, type ApprovedSender } from './pairing.js';
+106
View File
@@ -0,0 +1,106 @@
import { describe, expect, it, vi, beforeEach } from 'vitest';
import type { IncomingMessage, ServerResponse } from 'http';
import { ZaloAdapter } from './adapter.js';
const mockFetch = vi.fn();
vi.stubGlobal('fetch', mockFetch);
function mockReq(body: string, token?: string): IncomingMessage {
const req = {
headers: token ? { 'x-zalo-token': token } : {},
on(event: string, handler: (...args: unknown[]) => void) {
if (event === 'data') {
handler(Buffer.from(body, 'utf8'));
}
if (event === 'end') {
handler();
}
return this;
},
off: () => req,
destroy: () => undefined,
} as unknown as IncomingMessage;
return req;
}
function mockRes() {
const state = { statusCode: 0, body: '' };
const res = {
writeHead: (code: number) => {
state.statusCode = code;
},
end: (chunk?: string) => {
state.body = chunk ?? '';
},
} as unknown as ServerResponse;
return { res, state };
}
describe('ZaloAdapter', () => {
beforeEach(() => {
vi.clearAllMocks();
mockFetch.mockReset();
});
it('has name zalo and starts disconnected', () => {
const adapter = new ZaloAdapter({ oaAccessToken: 'token' });
expect(adapter.name).toBe('zalo');
expect(adapter.status).toBe('disconnected');
});
it('send posts to zalo message API', async () => {
const adapter = new ZaloAdapter({ oaAccessToken: 'token' });
await adapter.connect();
mockFetch.mockResolvedValue({
ok: true,
status: 200,
text: async () => '',
} as Response);
await adapter.send('uid-1', { text: 'hello zalo' });
expect(mockFetch).toHaveBeenCalledTimes(1);
expect(String(mockFetch.mock.calls[0]?.[0])).toContain('/v3.0/oa/message/cs');
});
it('handleEvent emits inbound message', async () => {
const adapter = new ZaloAdapter({ oaAccessToken: 'token', requireMention: false });
const inbound: Array<{ channel: string; senderId: string; text: string }> = [];
adapter.onMessage((msg) => inbound.push({ channel: msg.channel, senderId: msg.senderId, text: msg.text }));
await adapter.handleEvent({
sender: { id: 'uid-1' },
recipient: { id: 'oa-1' },
message: { msg_id: 'm1', text: 'ping' },
timestamp: 123,
});
expect(inbound).toEqual([{ channel: 'zalo', senderId: 'uid-1', text: 'ping' }]);
});
it('enforces webhook token when configured', async () => {
const adapter = new ZaloAdapter({ oaAccessToken: 'token', webhookToken: 'secret' });
const body = JSON.stringify({
sender: { id: 'uid-1' },
message: { msg_id: 'm1', text: 'ping' },
});
const req = mockReq(body, 'wrong');
const { res, state } = mockRes();
await adapter.handleRequest(req, res);
expect(state.statusCode).toBe(401);
});
it('drops messages missing required mention', async () => {
const adapter = new ZaloAdapter({ oaAccessToken: 'token', requireMention: true, mentionName: 'flynn' });
const handler = vi.fn();
adapter.onMessage(handler);
await adapter.handleEvent({
sender: { id: 'uid-1' },
message: { msg_id: 'm1', text: 'hello there' },
});
expect(handler).not.toHaveBeenCalled();
});
});
+177
View File
@@ -0,0 +1,177 @@
import type { IncomingMessage, ServerResponse } from 'http';
import type {
InboundMessage,
OutboundMessage,
ChannelAdapter,
ChannelStatus,
} from '../types.js';
import { shouldIgnoreForMissingMention, splitMessage } from '../utils.js';
import { readRequestBody } from '../../utils/httpBody.js';
export interface ZaloAdapterConfig {
oaAccessToken: string;
endpoint?: string;
webhookToken?: string;
allowedUserIds?: string[];
requireMention?: boolean;
mentionName?: string;
}
interface ZaloWebhookEvent {
token?: string;
sender?: { id?: string };
recipient?: { id?: string };
message?: {
msg_id?: string;
text?: string;
};
timestamp?: number;
}
const MAX_MESSAGE_LENGTH = 3500;
export class ZaloAdapter implements ChannelAdapter {
readonly name = 'zalo';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
constructor(private readonly config: ZaloAdapterConfig) {}
get status(): ChannelStatus {
return this._status;
}
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
async connect(): Promise<void> {
this._status = 'connected';
}
async disconnect(): Promise<void> {
this._status = 'disconnected';
}
async send(peerId: string, message: OutboundMessage): Promise<void> {
if (this._status !== 'connected') {
throw new Error('Zalo adapter not connected');
}
const text = message.text.trim();
if (!text) {
return;
}
const chunks = text.length > MAX_MESSAGE_LENGTH ? splitMessage(text, MAX_MESSAGE_LENGTH) : [text];
for (const chunk of chunks) {
await this.sendText(peerId, chunk);
}
}
async handleRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
let body = '';
try {
body = await readRequestBody(req, { maxBytes: 1_048_576 });
} catch {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid request body' }));
return;
}
let payload: ZaloWebhookEvent;
try {
payload = JSON.parse(body) as ZaloWebhookEvent;
} catch {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid JSON' }));
return;
}
if (this.config.webhookToken) {
const headerToken = req.headers['x-zalo-token'];
const token = typeof headerToken === 'string' ? headerToken : payload.token;
if (token !== this.config.webhookToken) {
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid webhook token' }));
return;
}
}
await this.handleEvent(payload);
res.writeHead(202, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ accepted: true }));
}
async handleEvent(payload: ZaloWebhookEvent): Promise<void> {
if (!this.messageHandler) {
return;
}
const senderId = payload.sender?.id?.trim();
if (!senderId) {
return;
}
if (this.config.allowedUserIds && this.config.allowedUserIds.length > 0) {
if (!this.config.allowedUserIds.includes(senderId)) {
return;
}
}
const text = (payload.message?.text ?? '').trim();
if (!text) {
return;
}
const mentionName = this.config.mentionName ?? 'flynn';
const mentionRegex = new RegExp(`(?:^|\\s)@?${escapeRegex(mentionName)}(?:\\b|:)`, 'i');
if (shouldIgnoreForMissingMention({
requireMention: this.config.requireMention,
defaultRequireMention: true,
mentionsBot: mentionRegex.test(text),
})) {
return;
}
const cleaned = text.replace(new RegExp(`^\\s*@?${escapeRegex(mentionName)}(?:\\b|:)\\s*`, 'i'), '').trim();
if (!cleaned) {
return;
}
this.messageHandler({
id: payload.message?.msg_id ?? `zalo-${Date.now()}`,
channel: 'zalo',
senderId,
text: cleaned,
timestamp: typeof payload.timestamp === 'number' ? payload.timestamp : Date.now(),
metadata: {
replyPeerId: senderId,
recipientId: payload.recipient?.id,
},
});
}
private async sendText(userId: string, text: string): Promise<void> {
const endpoint = `${(this.config.endpoint ?? 'https://openapi.zalo.me').replace(/\/+$/, '')}/v3.0/oa/message/cs`;
const response = await fetch(endpoint, {
method: 'POST',
headers: {
access_token: this.config.oaAccessToken,
'Content-Type': 'application/json',
},
body: JSON.stringify({
recipient: { user_id: userId },
message: { text },
}),
});
if (!response.ok) {
const body = await response.text().catch(() => '');
throw new Error(`Zalo send failed (${response.status}): ${body}`);
}
}
}
function escapeRegex(value: string): string {
return value.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
}
+1
View File
@@ -0,0 +1 @@
export { ZaloAdapter, type ZaloAdapterConfig } from './adapter.js';