diff --git a/config/default.yaml b/config/default.yaml index 5148f1d..07c30e0 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -200,7 +200,7 @@ models: # - set `default` to route normal text turns to a backend by default. # - per-agent override is available via `agent_configs..backend`. # backends: -# default: codex # claude_code | opencode | codex | gemini +# default: codex # claude_code | opencode | codex | gemini | pi_embedded # native: # enabled: true # claude_code: @@ -223,6 +223,13 @@ models: # # path: /usr/local/bin/gemini # # args: ["-p", "{prompt}"] # # timeout_ms: 120000 +# pi_embedded: +# enabled: false +# # timeout_ms: 120000 +# # no_tools_mode: true # keep Pi path text-only in canary; force native for tool-like prompts +# # model: openclaw-default # optional model/session selector passed to Pi runtime +# # system_prompt_mode: hybrid # flynn | pi_default | hybrid +# # module: "@badlogic/pi-agent-core" # optional module override # Optional: Kubernetes / homelab awareness tools (k8s.pods, k8s.deployments, k8s.logs) # k8s: @@ -321,7 +328,7 @@ memory: # assistant: # system_prompt: You are helpful. # model_tier: default -# backend: native # native | codex | claude_code | opencode | gemini +# backend: native # native | codex | claude_code | opencode | gemini | pi_embedded # coder: # system_prompt: Write code. # model_tier: complex diff --git a/src/agents/registry.ts b/src/agents/registry.ts index 55a8d91..07c32ce 100644 --- a/src/agents/registry.ts +++ b/src/agents/registry.ts @@ -5,7 +5,7 @@ export interface AgentConfig { name: string; systemPrompt?: string; modelTier?: ModelTier; - backend?: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini'; + backend?: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded'; toolProfile?: ToolProfile; toolOverrides?: ToolOverrideConfig; sandbox?: boolean; @@ -40,7 +40,7 @@ export class AgentConfigRegistry { loadFromConfig(rawConfigs: Record void } { + const dir = mkdtempSync(join(tmpdir(), 'flynn-pi-embedded-')); + const file = join(dir, 'module.mjs'); + writeFileSync(file, source, 'utf-8'); + return { + moduleUrl: pathToFileURL(file).href, + cleanup: () => rmSync(dir, { recursive: true, force: true }), + }; +} + +describe('PiEmbeddedBackend', () => { + it('returns text from a createAgentSession/run response object', async () => { + const mod = createModule(` + export function createAgentSession() { + return { + run(payload) { + return { text: "pi says: " + payload.input }; + }, + }; + } + `); + + try { + const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 2000 }); + const result = await backend.process({ prompt: 'hello', history: [] }); + expect(result).toBe('pi says: hello'); + } finally { + mod.cleanup(); + } + }); + + it('falls back to string payload when object payload is rejected', async () => { + const mod = createModule(` + export function createAgentSession() { + return { + run(payload) { + if (typeof payload !== "string") { + throw new Error("expected string payload"); + } + return "echo " + payload; + }, + }; + } + `); + + try { + const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 2000 }); + const result = await backend.process({ prompt: 'hello', history: [] }); + expect(result).toContain('echo USER: hello'); + } finally { + mod.cleanup(); + } + }); + + it('throws when module has no supported session factory', async () => { + const mod = createModule('export const version = "0.0.0";'); + + try { + const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 2000 }); + await expect(backend.process({ prompt: 'hello', history: [] })) + .rejects.toThrow('supported session factory'); + } finally { + mod.cleanup(); + } + }); + + it('throws when module cannot be loaded', async () => { + const backend = new PiEmbeddedBackend({ module: '/definitely/missing/pi-module.mjs', timeoutMs: 2000 }); + await expect(backend.process({ prompt: 'hello', history: [] })) + .rejects.toThrow('Failed to load Pi embedded runtime module'); + }); + + it('times out slow Pi requests', async () => { + const mod = createModule(` + export function createAgentSession() { + return { + run() { + return new Promise(() => {}); + }, + }; + } + `); + + try { + const backend = new PiEmbeddedBackend({ module: mod.moduleUrl, timeoutMs: 10 }); + await expect(backend.process({ prompt: 'hello', history: [] })) + .rejects.toThrow('timed out'); + } finally { + mod.cleanup(); + } + }); +}); diff --git a/src/backends/piEmbedded.ts b/src/backends/piEmbedded.ts new file mode 100644 index 0000000..b93b9ed --- /dev/null +++ b/src/backends/piEmbedded.ts @@ -0,0 +1,268 @@ +import type { ExternalBackend, ExternalBackendRequest } from './external.js'; + +const DEFAULT_TIMEOUT_MS = 120_000; +const DEFAULT_MODULE_CANDIDATES = [ + '@badlogic/pi-agent-core', + '@openclaw/pi-agent-core', + 'pi-agent-core', +] as const; + +type PiSystemPromptMode = 'flynn' | 'pi_default' | 'hybrid'; + +interface PiModuleLike { + [key: string]: unknown; +} + +interface PiSessionLike { + [key: string]: unknown; +} + +export interface PiEmbeddedBackendOptions { + timeoutMs?: number; + model?: string; + systemPromptMode?: PiSystemPromptMode; + module?: string; +} + +function buildPrompt(request: ExternalBackendRequest): string { + const lines: string[] = []; + for (const item of request.history) { + if (!item.content.trim()) { + continue; + } + lines.push(`${item.role.toUpperCase()}: ${item.content}`); + } + lines.push(`USER: ${request.prompt}`); + return lines.join('\n\n'); +} + +function isModuleNotFound(error: unknown, moduleName: string): boolean { + if (!(error instanceof Error)) { + return false; + } + const code = (error as NodeJS.ErrnoException).code; + if (code === 'ERR_MODULE_NOT_FOUND' || code === 'MODULE_NOT_FOUND') { + return true; + } + return ( + error.message.includes(`Cannot find package '${moduleName}'`) + || error.message.includes(`Cannot find module '${moduleName}'`) + ); +} + +function toErrorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function withTimeout(promise: Promise, timeoutMs: number, description: string): Promise { + let timeoutHandle: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => { + reject(new Error(`${description} timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }); + + return Promise.race([promise, timeoutPromise]) + .finally(() => { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + }); +} + +function getSessionFactory(moduleLike: PiModuleLike): ((args: Record) => unknown) | undefined { + const candidateNames = [ + 'createAgentSession', + 'createSession', + 'createPiSession', + 'createAgent', + ]; + for (const name of candidateNames) { + const candidate = moduleLike[name]; + if (typeof candidate === 'function') { + return candidate as (args: Record) => unknown; + } + } + return undefined; +} + +function extractText(value: unknown): string | undefined { + if (typeof value === 'string') { + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; + } + if (!value || typeof value !== 'object') { + return undefined; + } + + const obj = value as Record; + const scalarKeys = ['text', 'output', 'response', 'message', 'content']; + for (const key of scalarKeys) { + const scalar = obj[key]; + if (typeof scalar === 'string') { + const trimmed = scalar.trim(); + if (trimmed.length > 0) { + return trimmed; + } + } + } + + const content = obj.content; + if (Array.isArray(content)) { + const textParts = content + .map((part) => { + if (!part || typeof part !== 'object') { + return ''; + } + const textValue = (part as Record).text; + return typeof textValue === 'string' ? textValue.trim() : ''; + }) + .filter((part) => part.length > 0); + if (textParts.length > 0) { + return textParts.join('\n'); + } + } + + return undefined; +} + +async function maybeDisposeSession(session: unknown): Promise { + if (!session || typeof session !== 'object') { + return; + } + const obj = session as PiSessionLike; + const disposeMethods = ['close', 'dispose', 'destroy']; + for (const methodName of disposeMethods) { + const method = obj[methodName]; + if (typeof method === 'function') { + await Promise.resolve(method.call(session)); + return; + } + } +} + +export class PiEmbeddedBackend implements ExternalBackend { + readonly name = 'pi_embedded' as const; + private readonly timeoutMs: number; + private readonly model?: string; + private readonly systemPromptMode: PiSystemPromptMode; + private readonly moduleOverride?: string; + + constructor(options: PiEmbeddedBackendOptions = {}) { + this.timeoutMs = options.timeoutMs ?? DEFAULT_TIMEOUT_MS; + this.model = options.model; + this.systemPromptMode = options.systemPromptMode ?? 'hybrid'; + this.moduleOverride = options.module; + } + + async process(input: ExternalBackendRequest): Promise { + const prompt = buildPrompt(input); + const moduleLike = await this.loadPiModule(); + const factory = getSessionFactory(moduleLike); + if (!factory) { + throw new Error( + 'Loaded Pi module does not expose a supported session factory ' + + '(expected one of: createAgentSession, createSession, createPiSession, createAgent)', + ); + } + + const requestPayload: Record = { + prompt, + input: input.prompt, + history: input.history, + messages: [ + ...input.history.map((entry) => ({ role: entry.role, content: entry.content })), + { role: 'user', content: input.prompt }, + ], + ...(this.model ? { model: this.model } : {}), + systemPromptMode: this.systemPromptMode, + }; + + const session = await withTimeout( + Promise.resolve(factory(requestPayload)), + this.timeoutMs, + 'Pi embedded session initialization', + ); + + try { + const response = await withTimeout( + this.invokeSession(session, requestPayload, prompt), + this.timeoutMs, + 'Pi embedded request', + ); + const text = extractText(response); + if (!text) { + throw new Error('Pi embedded backend returned no text output'); + } + return text; + } finally { + await maybeDisposeSession(session); + } + } + + private async loadPiModule(): Promise { + const candidates = this.moduleOverride + ? [this.moduleOverride] + : [...DEFAULT_MODULE_CANDIDATES]; + const failures: string[] = []; + + for (const moduleName of candidates) { + try { + const loaded = await import(moduleName); + return loaded as PiModuleLike; + } catch (error) { + if (isModuleNotFound(error, moduleName)) { + failures.push(`${moduleName}: not installed`); + continue; + } + failures.push(`${moduleName}: ${toErrorMessage(error)}`); + } + } + + throw new Error( + 'Failed to load Pi embedded runtime module. ' + + `Tried: ${candidates.join(', ')}. ` + + `Details: ${failures.join(' | ')}`, + ); + } + + private async invokeSession( + session: unknown, + requestPayload: Record, + prompt: string, + ): Promise { + if (typeof session === 'function') { + return Promise.resolve(session(requestPayload)); + } + if (!session || typeof session !== 'object') { + throw new Error('Pi session factory returned an invalid session object'); + } + + const sessionObj = session as PiSessionLike; + const methodNames = ['run', 'sendMessage', 'chat', 'complete', 'invoke', 'process']; + for (const methodName of methodNames) { + const method = sessionObj[methodName]; + if (typeof method !== 'function') { + continue; + } + + const payloadAttempts: unknown[] = [requestPayload, prompt]; + let lastError: unknown; + for (const payload of payloadAttempts) { + try { + return await Promise.resolve(method.call(session, payload)); + } catch (error) { + lastError = error; + } + } + throw new Error( + `Pi session method "${methodName}" failed: ${toErrorMessage(lastError)}`, + ); + } + + throw new Error( + 'Pi session object does not expose a supported invocation method ' + + '(expected one of: run, sendMessage, chat, complete, invoke, process)', + ); + } +} diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 7d9e6b1..60b439e 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -403,12 +403,18 @@ describe('configSchema — agent_configs', () => { tool_profile: 'coding', sandbox: true, }, + pi_canary: { + model_tier: 'default', + backend: 'pi_embedded', + tool_profile: 'messaging', + }, }, }); expect(result.agent_configs.assistant.system_prompt).toBe('You are helpful.'); expect(result.agent_configs.assistant.backend).toBe('codex'); expect(result.agent_configs.assistant.tool_profile).toBe('messaging'); expect(result.agent_configs.coder.sandbox).toBe(true); + expect(result.agent_configs.pi_canary.backend).toBe('pi_embedded'); }); }); @@ -500,20 +506,31 @@ describe('configSchema — backends', () => { expect(result.backends.opencode.args).toEqual([]); expect(result.backends.codex.enabled).toBe(false); expect(result.backends.gemini.enabled).toBe(false); + expect(result.backends.pi_embedded.enabled).toBe(false); + expect(result.backends.pi_embedded.no_tools_mode).toBe(true); + expect(result.backends.pi_embedded.system_prompt_mode).toBe('hybrid'); expect(result.backends.native.enabled).toBe(true); }); - it('accepts explicit codex/gemini backend config', () => { + it('accepts explicit codex/gemini/pi_embedded backend config', () => { const result = configSchema.parse({ ...minimalConfig, backends: { - default: 'codex', + default: 'pi_embedded', codex: { enabled: true, path: '/usr/local/bin/codex', args: ['run'], timeout_ms: 300000 }, gemini: { enabled: true, path: '/usr/local/bin/gemini', args: ['chat'], timeout_ms: 60000 }, + pi_embedded: { + enabled: true, + timeout_ms: 45000, + no_tools_mode: false, + model: 'openclaw-default', + system_prompt_mode: 'flynn', + module: '@badlogic/pi-agent-core', + }, }, }); - expect(result.backends.default).toBe('codex'); + expect(result.backends.default).toBe('pi_embedded'); expect(result.backends.codex.enabled).toBe(true); expect(result.backends.codex.path).toBe('/usr/local/bin/codex'); expect(result.backends.codex.args).toEqual(['run']); @@ -522,6 +539,12 @@ describe('configSchema — backends', () => { expect(result.backends.gemini.path).toBe('/usr/local/bin/gemini'); expect(result.backends.gemini.args).toEqual(['chat']); expect(result.backends.gemini.timeout_ms).toBe(60000); + expect(result.backends.pi_embedded.enabled).toBe(true); + expect(result.backends.pi_embedded.timeout_ms).toBe(45000); + expect(result.backends.pi_embedded.no_tools_mode).toBe(false); + expect(result.backends.pi_embedded.model).toBe('openclaw-default'); + expect(result.backends.pi_embedded.system_prompt_mode).toBe('flynn'); + expect(result.backends.pi_embedded.module).toBe('@badlogic/pi-agent-core'); }); }); diff --git a/src/config/schema.ts b/src/config/schema.ts index a754e70..66f627d 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -184,7 +184,7 @@ const modelsSchema = z.object({ }); const backendsSchema = z.object({ - default: z.enum(['claude_code', 'opencode', 'codex', 'gemini']).optional(), + default: z.enum(['claude_code', 'opencode', 'codex', 'gemini', 'pi_embedded']).optional(), claude_code: z.object({ enabled: z.boolean().default(false), path: z.string().optional(), @@ -209,6 +209,14 @@ const backendsSchema = z.object({ args: z.array(z.string()).default([]), timeout_ms: z.number().min(1_000).max(600_000).default(120_000), }).default({ enabled: false }), + pi_embedded: z.object({ + enabled: z.boolean().default(false), + timeout_ms: z.number().min(1_000).max(600_000).default(120_000), + no_tools_mode: z.boolean().default(true), + model: z.string().optional(), + system_prompt_mode: z.enum(['flynn', 'pi_default', 'hybrid']).default('hybrid'), + module: z.string().optional(), + }).default({ enabled: false }), native: z.object({ enabled: z.boolean().default(true), }).default({ enabled: true }), @@ -853,7 +861,7 @@ const sandboxSchema = z.object({ const agentConfigEntrySchema = z.object({ system_prompt: z.string().optional(), model_tier: modelTierEnum.optional(), - backend: z.enum(['native', 'claude_code', 'opencode', 'codex', 'gemini']).optional(), + backend: z.enum(['native', 'claude_code', 'opencode', 'codex', 'gemini', 'pi_embedded']).optional(), tool_profile: toolProfileEnum.optional(), tool_overrides: toolOverrideSchema.optional(), sandbox: z.boolean().default(false), diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 7be7990..47ed9c1 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -39,6 +39,7 @@ import { OpenCodeBackend, CodexBackend, GeminiBackend, + PiEmbeddedBackend, type ExternalBackend, type ExternalBackendName, } from '../backends/index.js'; @@ -77,6 +78,14 @@ function createConfiguredExternalBackends(config: Config): { config.backends.gemini.timeout_ms, ); } + if (config.backends.pi_embedded.enabled) { + backends.pi_embedded = new PiEmbeddedBackend({ + timeoutMs: config.backends.pi_embedded.timeout_ms, + model: config.backends.pi_embedded.model, + systemPromptMode: config.backends.pi_embedded.system_prompt_mode, + module: config.backends.pi_embedded.module, + }); + } const selectedDefault = config.backends.default; const defaultName = selectedDefault && backends[selectedDefault]