/** * MCP Loader Extension * * Reads an `mcp` block from pi's settings.json (same shape as opencode and * Claude Desktop) and connects to each declared server, exposing all of * their tools to pi as native tools. * * Settings.json shape: * * { * "mcp": { * "searxng": { * "type": "local", * "command": ["uvx", "mcp-searxng"], * "env": { "SEARXNG_URL": "https://searxng.your-host.lan" } * }, * "gitea": { * "type": "local", * "command": ["gitea-mcp", "-t", "stdio"], * "enabled": false, * "env": { "GITEA_ACCESS_TOKEN": "...", "GITEA_HOST": "..." } * }, * "context7": { * "type": "remote", * "url": "https://mcp.context7.com/mcp" * } * } * } * * Per-server keys: * type "local" (stdio subprocess) | "remote" (streamable-http) * command argv array — first element is the executable, rest are args. * For local servers only. * url remote MCP endpoint URL. For remote servers only. * enabled default true. Set false to disable a server without * removing it from settings.json. * env optional object of environment variables to inject into * the spawned subprocess. Inherits parent env first, then * overlays these keys. * * Tool naming * * Each MCP tool is registered with pi as `_` to * avoid collisions across servers. If the tool name already begins with * `_` (e.g. mempalace's tools are `mempalace_search` etc.) * the prefix is not duplicated. * * Lifecycle * * • Servers are spawned at extension load time (pi startup or /reload). * • Each server's `tools/list` is awaited before pi finishes registering. * • Subprocess stderr is silenced unless PI_MCP_LOADER_DEBUG=1. * • Subprocesses receive SIGTERM at session_shutdown. * • A server that fails to start (binary missing, init handshake error) * logs a single line to stderr and is skipped. Other servers continue. * Pi keeps working without that server's tools. * * Limitations (v1) * * • Only local/stdio transport is implemented. Remote (streamable-http, * SSE) servers are detected and skipped with a warning. v2 will add * remote support — context7 is the prime motivator. * • No reconnect: if a subprocess dies mid-session, its tools become * unavailable until pi is reloaded. The mempalace.ts extension has * the same limitation today. * * Coexistence with mempalace.ts * * The mempalace.ts extension already spawns mempalace-mcp directly with * bespoke handling (agent identity injection from $MEMPALACE_AGENT_NAME, * wake-up context). This loader does NOT touch mempalace — leave the * bespoke extension in place. If you also list mempalace under the `mcp` * block in settings.json the loader will register a parallel set of * tools, which is harmless but redundant. Don't. */ import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process"; import * as fs from "node:fs"; import * as os from "node:os"; import * as path from "node:path"; import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; import { Type } from "typebox"; // ── MCP types ──────────────────────────────────────────────────────────────── type McpTool = { name: string; description?: string; inputSchema?: Record; }; type LocalServerConfig = { type?: "local"; command: string[]; env?: Record; enabled?: boolean; }; type RemoteServerConfig = { type: "remote"; url: string; enabled?: boolean; }; type ServerConfig = LocalServerConfig | RemoteServerConfig; const SETTINGS_PATH = path.join(os.homedir(), ".pi", "agent", "settings.json"); const DEBUG = process.env.PI_MCP_LOADER_DEBUG === "1"; function dlog(msg: string) { if (DEBUG) process.stderr.write(`[mcp-loader] ${msg}\n`); } function warn(msg: string) { process.stderr.write(`[mcp-loader] ${msg}\n`); } // ── Settings.json reader ───────────────────────────────────────────────────── function readMcpServers(): Record { if (!fs.existsSync(SETTINGS_PATH)) { dlog(`no settings.json at ${SETTINGS_PATH}`); return {}; } let raw: string; try { raw = fs.readFileSync(SETTINGS_PATH, "utf8"); } catch (err) { warn(`could not read ${SETTINGS_PATH}: ${(err as Error).message}`); return {}; } let parsed: any; try { parsed = JSON.parse(raw); } catch (err) { warn(`settings.json parse error: ${(err as Error).message}`); return {}; } const block = parsed?.mcp; if (!block || typeof block !== "object") { dlog("no `mcp` block in settings.json"); return {}; } return block as Record; } // ── Stdio MCP client ───────────────────────────────────────────────────────── class StdioMcpClient { private proc: ChildProcessWithoutNullStreams | null = null; private nextId = 1; private pending = new Map void; reject: (e: Error) => void }>(); private stdoutBuf = ""; private serverName: string; public tools: McpTool[] = []; constructor(serverName: string) { this.serverName = serverName; } async start(command: string, args: string[], env?: Record): Promise { const childEnv = { ...process.env, ...(env ?? {}) }; this.proc = spawn(command, args, { stdio: ["pipe", "pipe", "pipe"], env: childEnv }); this.proc.on("error", (err) => this.failAll(err)); this.proc.on("exit", (code, signal) => { this.failAll(new Error(`${this.serverName}: subprocess exited (code=${code}, signal=${signal})`)); }); this.proc.stdout.setEncoding("utf8"); this.proc.stdout.on("data", (chunk: string) => this.onStdout(chunk)); this.proc.stderr.setEncoding("utf8"); if (DEBUG) { this.proc.stderr.on("data", (chunk: string) => { process.stderr.write(`[${this.serverName} stderr] ${chunk}`); }); } else { this.proc.stderr.resume(); // drain without logging } // MCP initialize handshake await this.request("initialize", { protocolVersion: "2024-11-05", capabilities: {}, clientInfo: { name: "pi-mcp-loader", version: "0.1.0" }, }); this.notify("notifications/initialized", {}); const listed = await this.request("tools/list", {}); this.tools = (listed?.tools as McpTool[]) ?? []; } private onStdout(chunk: string) { this.stdoutBuf += chunk; let nl: number; while ((nl = this.stdoutBuf.indexOf("\n")) !== -1) { const line = this.stdoutBuf.slice(0, nl).trim(); this.stdoutBuf = this.stdoutBuf.slice(nl + 1); if (!line) continue; let msg: any; try { msg = JSON.parse(line); } catch { continue; } if (typeof msg.id === "number" && this.pending.has(msg.id)) { const { resolve, reject } = this.pending.get(msg.id)!; this.pending.delete(msg.id); if (msg.error) reject(new Error(msg.error.message ?? "MCP error")); else resolve(msg.result); } // notifications (no id) ignored. } } private failAll(err: Error) { for (const { reject } of this.pending.values()) reject(err); this.pending.clear(); } private write(obj: unknown) { if (!this.proc) throw new Error(`${this.serverName}: not started`); this.proc.stdin.write(`${JSON.stringify(obj)}\n`); } private notify(method: string, params: unknown) { this.write({ jsonrpc: "2.0", method, params }); } request(method: string, params: unknown): Promise { const id = this.nextId++; return new Promise((resolve, reject) => { this.pending.set(id, { resolve, reject }); this.write({ jsonrpc: "2.0", id, method, params }); }); } async callTool(name: string, args: Record): Promise { return this.request("tools/call", { name, arguments: args }); } stop() { if (this.proc) { try { this.proc.kill("SIGTERM"); } catch {} this.proc = null; } } } // ── MCP result → pi tool result helpers ────────────────────────────────────── function extractText(mcpResult: any): string { if (!mcpResult) return ""; const content = mcpResult.content; if (!Array.isArray(content)) return JSON.stringify(mcpResult); return content .map((c: any) => { if (c?.type === "text" && typeof c.text === "string") return c.text; return JSON.stringify(c); }) .join("\n"); } function namespacedToolName(serverName: string, toolName: string): string { // Avoid double-prefix: if the MCP server already names its tools // `_<...>`, leave them alone. const prefix = `${serverName}_`; if (toolName.startsWith(prefix)) return toolName; return `${prefix}${toolName}`; } // ── Extension entry ────────────────────────────────────────────────────────── export default async function mcpLoaderExtension(pi: ExtensionAPI) { const servers = readMcpServers(); const serverNames = Object.keys(servers); if (serverNames.length === 0) { dlog("no MCP servers configured — nothing to do"); return; } const clients: StdioMcpClient[] = []; for (const [name, cfg] of Object.entries(servers)) { if (cfg.enabled === false) { dlog(`${name}: disabled in settings.json, skipping`); continue; } if (cfg.type === "remote") { warn( `${name}: remote MCP transport not yet supported by mcp-loader (v1 is stdio-only); skipping`, ); continue; } const local = cfg as LocalServerConfig; if (!Array.isArray(local.command) || local.command.length === 0) { warn(`${name}: invalid \`command\` (must be non-empty array), skipping`); continue; } const [bin, ...args] = local.command; const client = new StdioMcpClient(name); try { await client.start(bin, args, local.env); } catch (err) { warn(`${name}: failed to start — ${(err as Error).message}`); client.stop(); continue; } dlog(`${name}: connected, ${client.tools.length} tools`); clients.push(client); // Register each MCP tool as a pi tool. for (const tool of client.tools) { const piName = namespacedToolName(name, tool.name); const schema = tool.inputSchema && typeof tool.inputSchema === "object" ? (Type.Unsafe>( tool.inputSchema as object, ) as unknown as ReturnType) : Type.Object({}, { additionalProperties: true }); pi.registerTool({ name: piName, label: piName, description: tool.description ?? `MCP tool from ${name}`, parameters: schema, async execute(_toolCallId, params) { try { const result = await client.callTool(tool.name, params as Record); const text = extractText(result); return { content: [{ type: "text", text }], details: { server: name, tool: tool.name, raw: result }, }; } catch (err) { return { content: [ { type: "text", text: `MCP call failed (${name}/${tool.name}): ${(err as Error).message}`, }, ], details: { server: name, tool: tool.name, error: (err as Error).message }, isError: true, }; } }, }); } } // Tear down subprocesses on session shutdown so reloads don't leak procs. pi.on("session_shutdown", async () => { for (const c of clients) c.stop(); }); }