feat(gateway): add sender presence tracking

This commit is contained in:
William Valentin
2026-02-15 19:28:16 -08:00
parent 421942f66d
commit c6e3d09ecc
11 changed files with 358 additions and 8 deletions
+65
View File
@@ -132,6 +132,71 @@ describe('ChannelRegistry', () => {
expect(adapter.sendFn).toHaveBeenCalledWith('user-42', { text: 'pong' });
});
it('tracks presence from inbound messages', async () => {
const adapter = createMockAdapter('test-channel');
registry.register(adapter);
registry.setMessageHandler(async () => {});
adapter.triggerMessage(makeMessage('test-channel'));
await vi.waitFor(() => {
expect(registry.getPresence()).toHaveLength(1);
});
const [presence] = registry.getPresence();
expect(presence.channel).toBe('test-channel');
expect(presence.senderId).toBe('user-42');
expect(presence.messageCount).toBe(1);
expect(presence.status).toBe('online');
});
it('marks presence offline after inactivity window', async () => {
vi.useFakeTimers();
try {
registry = new ChannelRegistry({ offlineAfterMs: 1000 });
const adapter = createMockAdapter('test-channel');
registry.register(adapter);
registry.setMessageHandler(async () => {});
adapter.triggerMessage(makeMessage('test-channel'));
await vi.runAllTimersAsync();
vi.advanceTimersByTime(1500);
const [presence] = registry.getPresence();
expect(presence.status).toBe('offline');
} finally {
vi.useRealTimers();
}
});
it('filters presence by channel and status', async () => {
vi.useFakeTimers();
try {
registry = new ChannelRegistry({ offlineAfterMs: 1000 });
const a1 = createMockAdapter('telegram');
const a2 = createMockAdapter('discord');
registry.register(a1);
registry.register(a2);
registry.setMessageHandler(async () => {});
a1.triggerMessage(makeMessage('telegram'));
a2.triggerMessage({ ...makeMessage('discord'), senderId: 'user-99' });
await vi.runAllTimersAsync();
vi.advanceTimersByTime(1500);
a2.triggerMessage({ ...makeMessage('discord'), senderId: 'user-99' });
const telegramOnly = registry.getPresence({ channel: 'telegram' });
expect(telegramOnly).toHaveLength(1);
expect(telegramOnly[0].channel).toBe('telegram');
const onlineOnly = registry.getPresence({ status: 'online' });
expect(onlineOnly).toHaveLength(1);
expect(onlineOnly[0].channel).toBe('discord');
} finally {
vi.useRealTimers();
}
});
it('routes reply using metadata.replyPeerId when provided', async () => {
const adapter = createMockAdapter('test-channel');
registry.register(adapter);
+111
View File
@@ -14,9 +14,49 @@ import type {
OutboundMessage,
} from './types.js';
type PresenceStatus = 'online' | 'offline';
interface PresenceRecord {
channel: string;
senderId: string;
senderName?: string;
firstSeenAt: number;
lastSeenAt: number;
messageCount: number;
}
export interface PresenceEntry {
channel: string;
senderId: string;
senderName?: string;
firstSeenAt: number;
lastSeenAt: number;
messageCount: number;
status: PresenceStatus;
}
interface PresenceQuery {
channel?: string;
status?: PresenceStatus;
limit?: number;
}
interface ChannelRegistryOptions {
offlineAfterMs?: number;
maxPresenceEntries?: number;
}
export class ChannelRegistry {
private adapters: Map<string, ChannelAdapter> = new Map();
private messageHandler?: MessageHandler;
private presence: Map<string, PresenceRecord> = new Map();
private readonly offlineAfterMs: number;
private readonly maxPresenceEntries: number;
constructor(options?: ChannelRegistryOptions) {
this.offlineAfterMs = options?.offlineAfterMs ?? 5 * 60 * 1000;
this.maxPresenceEntries = options?.maxPresenceEntries ?? 10_000;
}
/** Register an adapter. Throws if name already registered. */
register(adapter: ChannelAdapter): void {
@@ -56,6 +96,37 @@ export class ChannelRegistry {
this.messageHandler = handler;
}
/**
* Return observed sender presence entries sorted by most recent activity.
* Status is inferred from lastSeenAt with an inactivity threshold.
*/
getPresence(query?: PresenceQuery): PresenceEntry[] {
const now = Date.now();
const limit = Math.max(1, query?.limit ?? 100);
const entries = Array.from(this.presence.values())
.filter((record) => !query?.channel || record.channel === query.channel)
.map((record): PresenceEntry => {
const status: PresenceStatus = (now - record.lastSeenAt) <= this.offlineAfterMs ? 'online' : 'offline';
return { ...record, status };
})
.filter((entry) => !query?.status || entry.status === query.status)
.sort((a, b) => b.lastSeenAt - a.lastSeenAt);
return entries.slice(0, limit);
}
/** Aggregate quick presence stats for dashboards. */
getPresenceSummary(): { total: number; online: number; offline: number } {
const entries = this.getPresence({ limit: this.maxPresenceEntries });
const online = entries.filter((entry) => entry.status === 'online').length;
return {
total: entries.length,
online,
offline: entries.length - online,
};
}
/** Start all registered adapters. Logs errors per adapter, doesn't throw. */
async startAll(): Promise<void> {
const adapters = Array.from(this.adapters.values());
@@ -92,6 +163,8 @@ export class ChannelRegistry {
/** Internal: route an inbound message to the message handler. */
private handleInbound(msg: InboundMessage): void {
this.recordPresence(msg);
if (!this.messageHandler) {
console.warn(`No message handler set, dropping message from '${msg.channel}'`);
return;
@@ -117,4 +190,42 @@ export class ChannelRegistry {
console.error(`Error handling message from '${msg.channel}':`, err);
});
}
private recordPresence(msg: InboundMessage): void {
const key = `${msg.channel}:${msg.senderId}`;
const now = Date.now();
const existing = this.presence.get(key);
if (existing) {
existing.lastSeenAt = now;
existing.messageCount += 1;
if (msg.senderName) {
existing.senderName = msg.senderName;
}
return;
}
this.presence.set(key, {
channel: msg.channel,
senderId: msg.senderId,
senderName: msg.senderName,
firstSeenAt: now,
lastSeenAt: now,
messageCount: 1,
});
if (this.presence.size > this.maxPresenceEntries) {
// Keep bounded memory by evicting least recently seen entry.
let oldestKey: string | undefined;
let oldestTs = Number.POSITIVE_INFINITY;
for (const [candidateKey, record] of this.presence.entries()) {
if (record.lastSeenAt < oldestTs) {
oldestTs = record.lastSeenAt;
oldestKey = candidateKey;
}
}
if (oldestKey) {
this.presence.delete(oldestKey);
}
}
}
}