Files
pi-extensions/extensions/mcp-loader.ts
T
joakimp 7eec49b9b8 mcp-loader: add /mcp slash command for runtime status + toggle
Mirrors /ext UX (space=stage, enter=apply+reload, esc=cancel) but for
MCP servers in the settings.json `mcp` block. Tracks per-server runtime
state captured at extension load time so users can see at a glance
which servers are running / failed / disabled / remote-skipped /
invalid, with tool counts for the running ones.

Toggling writes back to settings.json — disabling sets enabled:false,
re-enabling removes the explicit key (default is true) to keep the
file tidy. Then ctx.reload() picks up the change.

Closes the visibility gap surfaced by 'searxng_search isn't in /ext':
MCP-provided tools are runtime-spawned, not file-based extensions, so
they need their own list view. /mcp fills that hole.
2026-05-08 21:05:09 +02:00

597 lines
20 KiB
TypeScript

/**
* MCP Loader Extension
*
* Reads an `mcp` block from pi's settings.json (same shape as opencode and
* Claude Desktop) and connects to each declared server, exposing all of
* their tools to pi as native tools.
*
* Settings.json shape:
*
* {
* "mcp": {
* "searxng": {
* "type": "local",
* "command": ["uvx", "mcp-searxng"],
* "env": { "SEARXNG_URL": "https://searxng.your-host.lan" }
* },
* "gitea": {
* "type": "local",
* "command": ["gitea-mcp", "-t", "stdio"],
* "enabled": false,
* "env": { "GITEA_ACCESS_TOKEN": "...", "GITEA_HOST": "..." }
* },
* "context7": {
* "type": "remote",
* "url": "https://mcp.context7.com/mcp"
* }
* }
* }
*
* Per-server keys:
* type "local" (stdio subprocess) | "remote" (streamable-http)
* command argv array — first element is the executable, rest are args.
* For local servers only.
* url remote MCP endpoint URL. For remote servers only.
* enabled default true. Set false to disable a server without
* removing it from settings.json.
* env optional object of environment variables to inject into
* the spawned subprocess. Inherits parent env first, then
* overlays these keys.
*
* Tool naming
*
* Each MCP tool is registered with pi as `<server-name>_<tool-name>` to
* avoid collisions across servers. If the tool name already begins with
* `<server-name>_` (e.g. mempalace's tools are `mempalace_search` etc.)
* the prefix is not duplicated.
*
* Lifecycle
*
* • Servers are spawned at extension load time (pi startup or /reload).
* • Each server's `tools/list` is awaited before pi finishes registering.
* • Subprocess stderr is silenced unless PI_MCP_LOADER_DEBUG=1.
* • Subprocesses receive SIGTERM at session_shutdown.
* • A server that fails to start (binary missing, init handshake error)
* logs a single line to stderr and is skipped. Other servers continue.
* Pi keeps working without that server's tools.
*
* Limitations (v1)
*
* • Only local/stdio transport is implemented. Remote (streamable-http,
* SSE) servers are detected and skipped with a warning. v2 will add
* remote support — context7 is the prime motivator.
* • No reconnect: if a subprocess dies mid-session, its tools become
* unavailable until pi is reloaded. The mempalace.ts extension has
* the same limitation today.
*
* Coexistence with mempalace.ts
*
* The mempalace.ts extension already spawns mempalace-mcp directly with
* bespoke handling (agent identity injection from $MEMPALACE_AGENT_NAME,
* wake-up context). This loader does NOT touch mempalace — leave the
* bespoke extension in place. If you also list mempalace under the `mcp`
* block in settings.json the loader will register a parallel set of
* tools, which is harmless but redundant. Don't.
*
* Slash command
*
* `/mcp` lists configured MCP servers with their runtime status
* (running / failed / disabled / remote-skipped / invalid) and lets
* you toggle the `enabled` flag in settings.json. Same UX as `/ext`:
* space stages, enter applies + reloads, esc cancels.
*/
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import {
type ExtensionAPI,
getSettingsListTheme,
} from "@mariozechner/pi-coding-agent";
import {
Container,
Key,
matchesKey,
type SettingItem,
SettingsList,
} from "@mariozechner/pi-tui";
import { Type } from "typebox";
// ── MCP types ────────────────────────────────────────────────────────────────
type McpTool = {
name: string;
description?: string;
inputSchema?: Record<string, unknown>;
};
type LocalServerConfig = {
type?: "local";
command: string[];
env?: Record<string, string>;
enabled?: boolean;
};
type RemoteServerConfig = {
type: "remote";
url: string;
enabled?: boolean;
};
type ServerConfig = LocalServerConfig | RemoteServerConfig;
const SETTINGS_PATH = path.join(os.homedir(), ".pi", "agent", "settings.json");
const DEBUG = process.env.PI_MCP_LOADER_DEBUG === "1";
function dlog(msg: string) {
if (DEBUG) process.stderr.write(`[mcp-loader] ${msg}\n`);
}
function warn(msg: string) {
process.stderr.write(`[mcp-loader] ${msg}\n`);
}
// ── Settings.json reader ─────────────────────────────────────────────────────
function readMcpServers(): Record<string, ServerConfig> {
if (!fs.existsSync(SETTINGS_PATH)) {
dlog(`no settings.json at ${SETTINGS_PATH}`);
return {};
}
let raw: string;
try {
raw = fs.readFileSync(SETTINGS_PATH, "utf8");
} catch (err) {
warn(`could not read ${SETTINGS_PATH}: ${(err as Error).message}`);
return {};
}
let parsed: any;
try {
parsed = JSON.parse(raw);
} catch (err) {
warn(`settings.json parse error: ${(err as Error).message}`);
return {};
}
const block = parsed?.mcp;
if (!block || typeof block !== "object") {
dlog("no `mcp` block in settings.json");
return {};
}
return block as Record<string, ServerConfig>;
}
// ── Stdio MCP client ─────────────────────────────────────────────────────────
class StdioMcpClient {
private proc: ChildProcessWithoutNullStreams | null = null;
private nextId = 1;
private pending = new Map<number, { resolve: (v: any) => void; reject: (e: Error) => void }>();
private stdoutBuf = "";
private serverName: string;
public tools: McpTool[] = [];
constructor(serverName: string) {
this.serverName = serverName;
}
async start(command: string, args: string[], env?: Record<string, string>): Promise<void> {
const childEnv = { ...process.env, ...(env ?? {}) };
this.proc = spawn(command, args, { stdio: ["pipe", "pipe", "pipe"], env: childEnv });
this.proc.on("error", (err) => this.failAll(err));
this.proc.on("exit", (code, signal) => {
this.failAll(new Error(`${this.serverName}: subprocess exited (code=${code}, signal=${signal})`));
});
this.proc.stdout.setEncoding("utf8");
this.proc.stdout.on("data", (chunk: string) => this.onStdout(chunk));
this.proc.stderr.setEncoding("utf8");
if (DEBUG) {
this.proc.stderr.on("data", (chunk: string) => {
process.stderr.write(`[${this.serverName} stderr] ${chunk}`);
});
} else {
this.proc.stderr.resume(); // drain without logging
}
// MCP initialize handshake
await this.request("initialize", {
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pi-mcp-loader", version: "0.1.0" },
});
this.notify("notifications/initialized", {});
const listed = await this.request("tools/list", {});
this.tools = (listed?.tools as McpTool[]) ?? [];
}
private onStdout(chunk: string) {
this.stdoutBuf += chunk;
let nl: number;
while ((nl = this.stdoutBuf.indexOf("\n")) !== -1) {
const line = this.stdoutBuf.slice(0, nl).trim();
this.stdoutBuf = this.stdoutBuf.slice(nl + 1);
if (!line) continue;
let msg: any;
try {
msg = JSON.parse(line);
} catch {
continue;
}
if (typeof msg.id === "number" && this.pending.has(msg.id)) {
const { resolve, reject } = this.pending.get(msg.id)!;
this.pending.delete(msg.id);
if (msg.error) reject(new Error(msg.error.message ?? "MCP error"));
else resolve(msg.result);
}
// notifications (no id) ignored.
}
}
private failAll(err: Error) {
for (const { reject } of this.pending.values()) reject(err);
this.pending.clear();
}
private write(obj: unknown) {
if (!this.proc) throw new Error(`${this.serverName}: not started`);
this.proc.stdin.write(`${JSON.stringify(obj)}\n`);
}
private notify(method: string, params: unknown) {
this.write({ jsonrpc: "2.0", method, params });
}
request(method: string, params: unknown): Promise<any> {
const id = this.nextId++;
return new Promise((resolve, reject) => {
this.pending.set(id, { resolve, reject });
this.write({ jsonrpc: "2.0", id, method, params });
});
}
async callTool(name: string, args: Record<string, unknown>): Promise<any> {
return this.request("tools/call", { name, arguments: args });
}
stop() {
if (this.proc) {
try {
this.proc.kill("SIGTERM");
} catch {}
this.proc = null;
}
}
}
// ── MCP result → pi tool result helpers ──────────────────────────────────────
function extractText(mcpResult: any): string {
if (!mcpResult) return "";
const content = mcpResult.content;
if (!Array.isArray(content)) return JSON.stringify(mcpResult);
return content
.map((c: any) => {
if (c?.type === "text" && typeof c.text === "string") return c.text;
return JSON.stringify(c);
})
.join("\n");
}
function namespacedToolName(serverName: string, toolName: string): string {
// Avoid double-prefix: if the MCP server already names its tools
// `<serverName>_<...>`, leave them alone.
const prefix = `${serverName}_`;
if (toolName.startsWith(prefix)) return toolName;
return `${prefix}${toolName}`;
}
// ── Settings.json writer ─────────────────────────────────────────────────────
/**
* Flip the `enabled` flag for a server in settings.json and write back.
* Preserves all other keys verbatim by round-tripping through JSON with
* 2-space indent (matches the file's existing style).
*/
function setServerEnabled(serverName: string, enabled: boolean): void {
const raw = fs.readFileSync(SETTINGS_PATH, "utf8");
const parsed = JSON.parse(raw);
if (!parsed.mcp || !parsed.mcp[serverName]) {
throw new Error(`server '${serverName}' not in settings.json`);
}
if (enabled) {
// Default is true — clear the explicit flag if present, keep the file tidy.
delete parsed.mcp[serverName].enabled;
} else {
parsed.mcp[serverName].enabled = false;
}
fs.writeFileSync(SETTINGS_PATH, `${JSON.stringify(parsed, null, 2)}\n`, "utf8");
}
// ── Per-server runtime state (for /mcp display) ──────────────────────────────
type ServerRuntimeState =
| { kind: "running"; toolCount: number }
| { kind: "failed"; message: string }
| { kind: "disabled" }
| { kind: "remote-skipped" }
| { kind: "invalid"; message: string };
// ── Extension entry ──────────────────────────────────────────────────────────
export default async function mcpLoaderExtension(pi: ExtensionAPI) {
const servers = readMcpServers();
const runtime = new Map<string, ServerRuntimeState>();
const clients: StdioMcpClient[] = [];
// Always register /mcp — even when no servers are configured, so users
// discover the surface. Handler reads settings.json fresh on each invoke.
registerMcpCommand(pi, runtime);
if (Object.keys(servers).length === 0) {
dlog("no MCP servers configured — nothing to load");
return;
}
for (const [name, cfg] of Object.entries(servers)) {
if (cfg.enabled === false) {
dlog(`${name}: disabled in settings.json, skipping`);
runtime.set(name, { kind: "disabled" });
continue;
}
if (cfg.type === "remote") {
warn(
`${name}: remote MCP transport not yet supported by mcp-loader (v1 is stdio-only); skipping`,
);
runtime.set(name, { kind: "remote-skipped" });
continue;
}
const local = cfg as LocalServerConfig;
if (!Array.isArray(local.command) || local.command.length === 0) {
warn(`${name}: invalid \`command\` (must be non-empty array), skipping`);
runtime.set(name, { kind: "invalid", message: "command must be non-empty array" });
continue;
}
const [bin, ...args] = local.command;
const client = new StdioMcpClient(name);
try {
await client.start(bin, args, local.env);
} catch (err) {
const message = (err as Error).message;
warn(`${name}: failed to start — ${message}`);
runtime.set(name, { kind: "failed", message });
client.stop();
continue;
}
dlog(`${name}: connected, ${client.tools.length} tools`);
runtime.set(name, { kind: "running", toolCount: client.tools.length });
clients.push(client);
// Register each MCP tool as a pi tool.
for (const tool of client.tools) {
const piName = namespacedToolName(name, tool.name);
const schema =
tool.inputSchema && typeof tool.inputSchema === "object"
? (Type.Unsafe<Record<string, unknown>>(
tool.inputSchema as object,
) as unknown as ReturnType<typeof Type.Object>)
: Type.Object({}, { additionalProperties: true });
pi.registerTool({
name: piName,
label: piName,
description: tool.description ?? `MCP tool from ${name}`,
parameters: schema,
async execute(_toolCallId, params) {
try {
const result = await client.callTool(tool.name, params as Record<string, unknown>);
const text = extractText(result);
return {
content: [{ type: "text", text }],
details: { server: name, tool: tool.name, raw: result },
};
} catch (err) {
return {
content: [
{
type: "text",
text: `MCP call failed (${name}/${tool.name}): ${(err as Error).message}`,
},
],
details: { server: name, tool: tool.name, error: (err as Error).message },
isError: true,
};
}
},
});
}
}
// Tear down subprocesses on session shutdown so reloads don't leak procs.
pi.on("session_shutdown", async () => {
for (const c of clients) c.stop();
});
}
// ── /mcp slash command ──────────────────────────────────────────────────────
const VAL_ON = "● enabled";
const VAL_OFF = "○ disabled";
const VAL_RO = "[read-only]";
function statusLabel(state: ServerRuntimeState | undefined, configEnabled: boolean): string {
if (!state) return configEnabled ? "unknown" : "disabled";
switch (state.kind) {
case "running":
return `running · ${state.toolCount} tool${state.toolCount === 1 ? "" : "s"}`;
case "failed":
return `failed: ${state.message}`;
case "disabled":
return "disabled in settings";
case "remote-skipped":
return "remote (skipped — v1 stdio only)";
case "invalid":
return `invalid: ${state.message}`;
}
}
function registerMcpCommand(pi: ExtensionAPI, runtime: Map<string, ServerRuntimeState>) {
pi.registerCommand("mcp", {
description: "List and toggle MCP servers configured in ~/.pi/agent/settings.json",
handler: async (_args, ctx) => {
const servers = readMcpServers();
const names = Object.keys(servers);
if (names.length === 0) {
ctx.ui.notify(
`No MCP servers configured. Add an \`mcp\` block to ${SETTINGS_PATH}.`,
"info",
);
return;
}
// Snapshot config-level enabled state (default true if unset).
const configEnabled = new Map<string, boolean>();
for (const [n, cfg] of Object.entries(servers)) {
configEnabled.set(n, cfg.enabled !== false);
}
// Staged state: name → enabled (initialised from configEnabled).
const staged = new Map(configEnabled);
await ctx.ui.custom<void>((tui, theme, _kb, done) => {
let statusText = "";
const items: SettingItem[] = names.map((n) => {
const cfg = servers[n];
const cfgEn = configEnabled.get(n) ?? true;
const state = runtime.get(n);
const isRemote = cfg.type === "remote";
const isInvalid = state?.kind === "invalid";
const transport = isRemote ? "remote" : "local";
const desc = `${transport} · ${statusLabel(state, cfgEn)}`;
if (isInvalid) {
return {
id: n,
label: n,
currentValue: VAL_RO,
description: desc,
};
}
return {
id: n,
label: n,
currentValue: cfgEn ? VAL_ON : VAL_OFF,
values: [VAL_ON, VAL_OFF],
description: desc,
};
});
const container = new Container();
container.addChild({
render(_w: number) {
return [
theme.fg("accent", theme.bold("MCP servers — settings.json `mcp` block")),
theme.fg(
"muted",
" space: toggle (stage) · enter: apply + reload · esc: cancel",
),
"",
];
},
invalidate() {},
});
const settingsList = new SettingsList(
items,
Math.min(items.length + 2, 20),
getSettingsListTheme(),
(id, newValue) => {
const isInvalid = runtime.get(id)?.kind === "invalid";
if (isInvalid) {
settingsList.updateValue(id, VAL_RO);
statusText = `${id}: invalid config — fix in settings.json`;
tui.requestRender();
return;
}
const stagingEnabled = newValue === VAL_ON;
staged.set(id, stagingEnabled);
const drift: string[] = [];
for (const n of names) {
const s = staged.get(n);
const c = configEnabled.get(n);
if (s !== undefined && s !== c) drift.push(`${n}${s ? "on" : "off"}`);
}
statusText = drift.length ? `pending: ${drift.join(", ")}` : "";
tui.requestRender();
},
() => {
done(undefined);
},
);
container.addChild(settingsList);
container.addChild({
render(_w: number) {
if (!statusText) return [""];
const colored = statusText.startsWith("⊘")
? theme.fg("warning", statusText)
: theme.fg("muted", statusText);
return ["", colored];
},
invalidate() {},
});
return {
render(width: number) {
return container.render(width);
},
invalidate() {
container.invalidate();
},
handleInput(data: string) {
if (matchesKey(data, Key.enter)) {
const errors: string[] = [];
let applied = 0;
for (const n of names) {
const s = staged.get(n);
const c = configEnabled.get(n);
if (s === undefined || s === c) continue;
try {
setServerEnabled(n, s);
applied++;
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
errors.push(`${n}: ${msg}`);
}
}
done(undefined);
if (errors.length) {
ctx.ui.notify(`mcp: ${errors.join(" | ")}`, "error");
} else if (applied > 0) {
ctx.ui.notify(`mcp: applied ${applied} change(s); reloading…`, "info");
ctx.reload().catch((err) => {
const msg = err instanceof Error ? err.message : String(err);
ctx.ui.notify(`reload failed: ${msg}`, "error");
});
}
return;
}
settingsList.handleInput?.(data);
tui.requestRender();
},
};
});
},
});
}