feat(backends): add optional pi_embedded backend and config wiring

This commit is contained in:
William Valentin
2026-02-23 21:12:52 -08:00
parent 0af44330b5
commit ac61c9c3fb
9 changed files with 426 additions and 10 deletions
+9 -2
View File
@@ -200,7 +200,7 @@ models:
# - set `default` to route normal text turns to a backend by default.
# - per-agent override is available via `agent_configs.<name>.backend`.
# backends:
# default: codex # claude_code | opencode | codex | gemini
# default: codex # claude_code | opencode | codex | gemini | pi_embedded
# native:
# enabled: true
# claude_code:
@@ -223,6 +223,13 @@ models:
# # path: /usr/local/bin/gemini
# # args: ["-p", "{prompt}"]
# # timeout_ms: 120000
# pi_embedded:
# enabled: false
# # timeout_ms: 120000
# # no_tools_mode: true # keep Pi path text-only in canary; force native for tool-like prompts
# # model: openclaw-default # optional model/session selector passed to Pi runtime
# # system_prompt_mode: hybrid # flynn | pi_default | hybrid
# # module: "@badlogic/pi-agent-core" # optional module override
# Optional: Kubernetes / homelab awareness tools (k8s.pods, k8s.deployments, k8s.logs)
# k8s:
@@ -321,7 +328,7 @@ memory:
# assistant:
# system_prompt: You are helpful.
# model_tier: default
# backend: native # native | codex | claude_code | opencode | gemini
# backend: native # native | codex | claude_code | opencode | gemini | pi_embedded
# coder:
# system_prompt: Write code.
# model_tier: complex
+2 -2
View File
@@ -5,7 +5,7 @@ export interface AgentConfig {
name: string;
systemPrompt?: string;
modelTier?: ModelTier;
backend?: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini';
backend?: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded';
toolProfile?: ToolProfile;
toolOverrides?: ToolOverrideConfig;
sandbox?: boolean;
@@ -40,7 +40,7 @@ export class AgentConfigRegistry {
loadFromConfig(rawConfigs: Record<string, {
system_prompt?: string;
model_tier?: string;
backend?: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini';
backend?: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded';
tool_profile?: string;
tool_overrides?: ToolOverrideConfig;
sandbox?: boolean;
+1 -1
View File
@@ -1,6 +1,6 @@
import { execFile } from 'child_process';
export type ExternalBackendName = 'claude_code' | 'opencode' | 'codex' | 'gemini';
export type ExternalBackendName = 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded';
export interface ExternalBackendRequest {
prompt: string;
+1
View File
@@ -23,3 +23,4 @@ export {
type ExternalBackendName,
type ExternalBackendRequest,
} from './external.js';
export { PiEmbeddedBackend, type PiEmbeddedBackendOptions } from './piEmbedded.js';
+100
View File
@@ -0,0 +1,100 @@
import { mkdtempSync, rmSync, writeFileSync } from 'fs';
import { join } from 'path';
import { tmpdir } from 'os';
import { pathToFileURL } from 'url';
import { describe, expect, it } from 'vitest';
import { PiEmbeddedBackend } from './piEmbedded.js';
function createModule(source: string): { moduleUrl: string; cleanup: () => void } {
const dir = mkdtempSync(join(tmpdir(), 'flynn-pi-embedded-'));
const file = join(dir, 'module.mjs');
writeFileSync(file, source, 'utf-8');
return {
moduleUrl: pathToFileURL(file).href,
cleanup: () => rmSync(dir, { recursive: true, force: true }),
};
}
describe('PiEmbeddedBackend', () => {
it('returns text from a createAgentSession/run response object', async () => {
const mod = createModule(`
export function createAgentSession() {
return {
run(payload) {
return { text: "pi says: " + payload.input };
},
};
}
`);
try {
const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 2000 });
const result = await backend.process({ prompt: 'hello', history: [] });
expect(result).toBe('pi says: hello');
} finally {
mod.cleanup();
}
});
it('falls back to string payload when object payload is rejected', async () => {
const mod = createModule(`
export function createAgentSession() {
return {
run(payload) {
if (typeof payload !== "string") {
throw new Error("expected string payload");
}
return "echo " + payload;
},
};
}
`);
try {
const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 2000 });
const result = await backend.process({ prompt: 'hello', history: [] });
expect(result).toContain('echo USER: hello');
} finally {
mod.cleanup();
}
});
it('throws when module has no supported session factory', async () => {
const mod = createModule('export const version = "0.0.0";');
try {
const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 2000 });
await expect(backend.process({ prompt: 'hello', history: [] }))
.rejects.toThrow('supported session factory');
} finally {
mod.cleanup();
}
});
it('throws when module cannot be loaded', async () => {
const backend = new PiEmbeddedBackend({ module: '/definitely/missing/pi-module.mjs', timeoutMs: 2000 });
await expect(backend.process({ prompt: 'hello', history: [] }))
.rejects.toThrow('Failed to load Pi embedded runtime module');
});
it('times out slow Pi requests', async () => {
const mod = createModule(`
export function createAgentSession() {
return {
run() {
return new Promise(() => {});
},
};
}
`);
try {
const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 10 });
await expect(backend.process({ prompt: 'hello', history: [] }))
.rejects.toThrow('timed out');
} finally {
mod.cleanup();
}
});
});
+268
View File
@@ -0,0 +1,268 @@
import type { ExternalBackend, ExternalBackendRequest } from './external.js';
const DEFAULT_TIMEOUT_MS = 120_000;
const DEFAULT_MODULE_CANDIDATES = [
'@badlogic/pi-agent-core',
'@openclaw/pi-agent-core',
'pi-agent-core',
] as const;
type PiSystemPromptMode = 'flynn' | 'pi_default' | 'hybrid';
interface PiModuleLike {
[key: string]: unknown;
}
interface PiSessionLike {
[key: string]: unknown;
}
export interface PiEmbeddedBackendOptions {
timeoutMs?: number;
model?: string;
systemPromptMode?: PiSystemPromptMode;
module?: string;
}
function buildPrompt(request: ExternalBackendRequest): string {
const lines: string[] = [];
for (const item of request.history) {
if (!item.content.trim()) {
continue;
}
lines.push(`${item.role.toUpperCase()}: ${item.content}`);
}
lines.push(`USER: ${request.prompt}`);
return lines.join('\n\n');
}
function isModuleNotFound(error: unknown, moduleName: string): boolean {
if (!(error instanceof Error)) {
return false;
}
const code = (error as NodeJS.ErrnoException).code;
if (code === 'ERR_MODULE_NOT_FOUND' || code === 'MODULE_NOT_FOUND') {
return true;
}
return (
error.message.includes(`Cannot find package '${moduleName}'`)
|| error.message.includes(`Cannot find module '${moduleName}'`)
);
}
function toErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function withTimeout<T>(promise: Promise<T>, timeoutMs: number, description: string): Promise<T> {
let timeoutHandle: NodeJS.Timeout | undefined;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutHandle = setTimeout(() => {
reject(new Error(`${description} timed out after ${timeoutMs}ms`));
}, timeoutMs);
});
return Promise.race([promise, timeoutPromise])
.finally(() => {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
});
}
function getSessionFactory(moduleLike: PiModuleLike): ((args: Record<string, unknown>) => unknown) | undefined {
const candidateNames = [
'createAgentSession',
'createSession',
'createPiSession',
'createAgent',
];
for (const name of candidateNames) {
const candidate = moduleLike[name];
if (typeof candidate === 'function') {
return candidate as (args: Record<string, unknown>) => unknown;
}
}
return undefined;
}
function extractText(value: unknown): string | undefined {
if (typeof value === 'string') {
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
if (!value || typeof value !== 'object') {
return undefined;
}
const obj = value as Record<string, unknown>;
const scalarKeys = ['text', 'output', 'response', 'message', 'content'];
for (const key of scalarKeys) {
const scalar = obj[key];
if (typeof scalar === 'string') {
const trimmed = scalar.trim();
if (trimmed.length > 0) {
return trimmed;
}
}
}
const content = obj.content;
if (Array.isArray(content)) {
const textParts = content
.map((part) => {
if (!part || typeof part !== 'object') {
return '';
}
const textValue = (part as Record<string, unknown>).text;
return typeof textValue === 'string' ? textValue.trim() : '';
})
.filter((part) => part.length > 0);
if (textParts.length > 0) {
return textParts.join('\n');
}
}
return undefined;
}
async function maybeDisposeSession(session: unknown): Promise<void> {
if (!session || typeof session !== 'object') {
return;
}
const obj = session as PiSessionLike;
const disposeMethods = ['close', 'dispose', 'destroy'];
for (const methodName of disposeMethods) {
const method = obj[methodName];
if (typeof method === 'function') {
await Promise.resolve(method.call(session));
return;
}
}
}
export class PiEmbeddedBackend implements ExternalBackend {
readonly name = 'pi_embedded' as const;
private readonly timeoutMs: number;
private readonly model?: string;
private readonly systemPromptMode: PiSystemPromptMode;
private readonly moduleOverride?: string;
constructor(options: PiEmbeddedBackendOptions = {}) {
this.timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS;
this.model = options.model;
this.systemPromptMode = options.systemPromptMode ?? 'hybrid';
this.moduleOverride = options.module;
}
async process(input: ExternalBackendRequest): Promise<string> {
const prompt = buildPrompt(input);
const moduleLike = await this.loadPiModule();
const factory = getSessionFactory(moduleLike);
if (!factory) {
throw new Error(
'Loaded Pi module does not expose a supported session factory ' +
'(expected one of: createAgentSession, createSession, createPiSession, createAgent)',
);
}
const requestPayload: Record<string, unknown> = {
prompt,
input: input.prompt,
history: input.history,
messages: [
...input.history.map((entry) => ({ role: entry.role, content: entry.content })),
{ role: 'user', content: input.prompt },
],
...(this.model ? { model: this.model } : {}),
systemPromptMode: this.systemPromptMode,
};
const session = await withTimeout(
Promise.resolve(factory(requestPayload)),
this.timeoutMs,
'Pi embedded session initialization',
);
try {
const response = await withTimeout(
this.invokeSession(session, requestPayload, prompt),
this.timeoutMs,
'Pi embedded request',
);
const text = extractText(response);
if (!text) {
throw new Error('Pi embedded backend returned no text output');
}
return text;
} finally {
await maybeDisposeSession(session);
}
}
private async loadPiModule(): Promise<PiModuleLike> {
const candidates = this.moduleOverride
? [this.moduleOverride]
: [...DEFAULT_MODULE_CANDIDATES];
const failures: string[] = [];
for (const moduleName of candidates) {
try {
const loaded = await import(moduleName);
return loaded as PiModuleLike;
} catch (error) {
if (isModuleNotFound(error, moduleName)) {
failures.push(`${moduleName}: not installed`);
continue;
}
failures.push(`${moduleName}: ${toErrorMessage(error)}`);
}
}
throw new Error(
'Failed to load Pi embedded runtime module. ' +
`Tried: ${candidates.join(', ')}. ` +
`Details: ${failures.join(' | ')}`,
);
}
private async invokeSession(
session: unknown,
requestPayload: Record<string, unknown>,
prompt: string,
): Promise<unknown> {
if (typeof session === 'function') {
return Promise.resolve(session(requestPayload));
}
if (!session || typeof session !== 'object') {
throw new Error('Pi session factory returned an invalid session object');
}
const sessionObj = session as PiSessionLike;
const methodNames = ['run', 'sendMessage', 'chat', 'complete', 'invoke', 'process'];
for (const methodName of methodNames) {
const method = sessionObj[methodName];
if (typeof method !== 'function') {
continue;
}
const payloadAttempts: unknown[] = [requestPayload, prompt];
let lastError: unknown;
for (const payload of payloadAttempts) {
try {
return await Promise.resolve(method.call(session, payload));
} catch (error) {
lastError = error;
}
}
throw new Error(
`Pi session method "${methodName}" failed: ${toErrorMessage(lastError)}`,
);
}
throw new Error(
'Pi session object does not expose a supported invocation method ' +
'(expected one of: run, sendMessage, chat, complete, invoke, process)',
);
}
}
+26 -3
View File
@@ -403,12 +403,18 @@ describe('configSchema — agent_configs', () => {
tool_profile: 'coding',
sandbox: true,
},
pi_canary: {
model_tier: 'default',
backend: 'pi_embedded',
tool_profile: 'messaging',
},
},
});
expect(result.agent_configs.assistant.system_prompt).toBe('You are helpful.');
expect(result.agent_configs.assistant.backend).toBe('codex');
expect(result.agent_configs.assistant.tool_profile).toBe('messaging');
expect(result.agent_configs.coder.sandbox).toBe(true);
expect(result.agent_configs.pi_canary.backend).toBe('pi_embedded');
});
});
@@ -500,20 +506,31 @@ describe('configSchema — backends', () => {
expect(result.backends.opencode.args).toEqual([]);
expect(result.backends.codex.enabled).toBe(false);
expect(result.backends.gemini.enabled).toBe(false);
expect(result.backends.pi_embedded.enabled).toBe(false);
expect(result.backends.pi_embedded.no_tools_mode).toBe(true);
expect(result.backends.pi_embedded.system_prompt_mode).toBe('hybrid');
expect(result.backends.native.enabled).toBe(true);
});
it('accepts explicit codex/gemini backend config', () => {
it('accepts explicit codex/gemini/pi_embedded backend config', () => {
const result = configSchema.parse({
...minimalConfig,
backends: {
default: 'codex',
default: 'pi_embedded',
codex: { enabled: true, path: '/usr/local/bin/codex', args: ['run'], timeout_ms: 300000 },
gemini: { enabled: true, path: '/usr/local/bin/gemini', args: ['chat'], timeout_ms: 60000 },
pi_embedded: {
enabled: true,
timeout_ms: 45000,
no_tools_mode: false,
model: 'openclaw-default',
system_prompt_mode: 'flynn',
module: '@badlogic/pi-agent-core',
},
},
});
expect(result.backends.default).toBe('codex');
expect(result.backends.default).toBe('pi_embedded');
expect(result.backends.codex.enabled).toBe(true);
expect(result.backends.codex.path).toBe('/usr/local/bin/codex');
expect(result.backends.codex.args).toEqual(['run']);
@@ -522,6 +539,12 @@ describe('configSchema — backends', () => {
expect(result.backends.gemini.path).toBe('/usr/local/bin/gemini');
expect(result.backends.gemini.args).toEqual(['chat']);
expect(result.backends.gemini.timeout_ms).toBe(60000);
expect(result.backends.pi_embedded.enabled).toBe(true);
expect(result.backends.pi_embedded.timeout_ms).toBe(45000);
expect(result.backends.pi_embedded.no_tools_mode).toBe(false);
expect(result.backends.pi_embedded.model).toBe('openclaw-default');
expect(result.backends.pi_embedded.system_prompt_mode).toBe('flynn');
expect(result.backends.pi_embedded.module).toBe('@badlogic/pi-agent-core');
});
});
+10 -2
View File
@@ -184,7 +184,7 @@ const modelsSchema = z.object({
});
const backendsSchema = z.object({
default: z.enum(['claude_code', 'opencode', 'codex', 'gemini']).optional(),
default: z.enum(['claude_code', 'opencode', 'codex', 'gemini', 'pi_embedded']).optional(),
claude_code: z.object({
enabled: z.boolean().default(false),
path: z.string().optional(),
@@ -209,6 +209,14 @@ const backendsSchema = z.object({
args: z.array(z.string()).default([]),
timeout_ms: z.number().min(1_000).max(600_000).default(120_000),
}).default({ enabled: false }),
pi_embedded: z.object({
enabled: z.boolean().default(false),
timeout_ms: z.number().min(1_000).max(600_000).default(120_000),
no_tools_mode: z.boolean().default(true),
model: z.string().optional(),
system_prompt_mode: z.enum(['flynn', 'pi_default', 'hybrid']).default('hybrid'),
module: z.string().optional(),
}).default({ enabled: false }),
native: z.object({
enabled: z.boolean().default(true),
}).default({ enabled: true }),
@@ -853,7 +861,7 @@ const sandboxSchema = z.object({
const agentConfigEntrySchema = z.object({
system_prompt: z.string().optional(),
model_tier: modelTierEnum.optional(),
backend: z.enum(['native', 'claude_code', 'opencode', 'codex', 'gemini']).optional(),
backend: z.enum(['native', 'claude_code', 'opencode', 'codex', 'gemini', 'pi_embedded']).optional(),
tool_profile: toolProfileEnum.optional(),
tool_overrides: toolOverrideSchema.optional(),
sandbox: z.boolean().default(false),
+9
View File
@@ -39,6 +39,7 @@ import {
OpenCodeBackend,
CodexBackend,
GeminiBackend,
PiEmbeddedBackend,
type ExternalBackend,
type ExternalBackendName,
} from '../backends/index.js';
@@ -77,6 +78,14 @@ function createConfiguredExternalBackends(config: Config): {
config.backends.gemini.timeout_ms,
);
}
if (config.backends.pi_embedded.enabled) {
backends.pi_embedded = new PiEmbeddedBackend({
timeoutMs: config.backends.pi_embedded.timeout_ms,
model: config.backends.pi_embedded.model,
systemPromptMode: config.backends.pi_embedded.system_prompt_mode,
module: config.backends.pi_embedded.module,
});
}
const selectedDefault = config.backends.default;
const defaultName = selectedDefault && backends[selectedDefault]