mcp-loader v2: streamable-HTTP transport for remote MCP servers (context7)

- New RemoteMcpClient implementing MCP streamable-HTTP per spec 2025-03-26:
  POST JSON-RPC, parse application/json or text/event-stream responses,
  round-trip optional Mcp-Session-Id, optional auth via 'headers' config.
- Refactor StdioMcpClient to share an IMcpClient interface with the remote
  client; extension entry dispatches on cfg.type. Drops the v1 'remote
  skipped with warning' code path.
- Bump MCP_PROTOCOL_VERSION to 2025-11-25 (single constant, both clients).
- 404 self-heal: when a remote returns 404 to a request carrying our
  Mcp-Session-Id, drop the id, re-initialize, retry the request once
  (per spec 2025-11-25 \u00a72.2). allowReinitOn404=false on the retry path
  prevents recursion. Verified via mock-server smoke test.
- Sanitize pi-facing tool names to ^[A-Za-z][A-Za-z0-9_]{0,63}$. Anthropic
  allows hyphens but Bedrock's Anthropic shim rejects them, causing entire
  turns to 4xx silently when context7's hyphenated tools (resolve-library-id,
  query-docs) were registered. Original MCP-side names are preserved in the
  tool-execute closure, so sanitization is purely pi-facing.
- /mcp slash command: drop 'remote (skipped)' status label.
- Docs: README and AGENTS updated for transports, headers config, 404
  self-heal, tool-name sanitization rationale, OAuth limitation.

End-to-end verified: context7 connects through pi, returns useful docs
(Bun streaming/SSE example fetched successfully).
This commit is contained in:
2026-05-09 15:26:36 +02:00
parent 7eec49b9b8
commit 37cc49e06f
3 changed files with 349 additions and 72 deletions
+287 -48
View File
@@ -22,7 +22,8 @@
* },
* "context7": {
* "type": "remote",
* "url": "https://mcp.context7.com/mcp"
* "url": "https://mcp.context7.com/mcp",
* "headers": { "Authorization": "Bearer …" }
* }
* }
* }
@@ -32,11 +33,13 @@
* command argv array — first element is the executable, rest are args.
* For local servers only.
* url remote MCP endpoint URL. For remote servers only.
* headers optional object of HTTP headers to send with every request to
* a remote server (e.g. Authorization, X-API-Key). Remote only.
* enabled default true. Set false to disable a server without
* removing it from settings.json.
* env optional object of environment variables to inject into
* the spawned subprocess. Inherits parent env first, then
* overlays these keys.
* overlays these keys. Local only.
*
* Tool naming
*
@@ -47,22 +50,35 @@
*
* Lifecycle
*
* • Servers are spawned at extension load time (pi startup or /reload).
* • Servers are connected at extension load time (pi startup or /reload).
* • Each server's `tools/list` is awaited before pi finishes registering.
* • Subprocess stderr is silenced unless PI_MCP_LOADER_DEBUG=1.
* • Subprocesses receive SIGTERM at session_shutdown.
* • A server that fails to start (binary missing, init handshake error)
* logs a single line to stderr and is skipped. Other servers continue.
* Pi keeps working without that server's tools.
* • Local subprocess stderr is silenced unless PI_MCP_LOADER_DEBUG=1.
* • Local subprocesses receive SIGTERM at session_shutdown; remote
* servers receive an MCP DELETE if a session id was issued.
* • A server that fails to start (binary missing, init handshake error,
* remote 4xx/5xx) logs a single line to stderr and is skipped. Other
* servers continue. Pi keeps working without that server's tools.
*
* Limitations (v1)
* Transports
*
* • Only local/stdio transport is implemented. Remote (streamable-http,
* SSE) servers are detected and skipped with a warning. v2 will add
* remote support — context7 is the prime motivator.
* • No reconnect: if a subprocess dies mid-session, its tools become
* unavailable until pi is reloaded. The mempalace.ts extension has
* the same limitation today.
* localstdio JSON-RPC subprocess (mcp-searxng, gitea-mcp, …).
* remote — streamable-HTTP per MCP spec 2025-03-26: POST JSON-RPC, server
* replies either application/json or text/event-stream. Optional
* Mcp-Session-Id header round-tripped if the server issues one.
* No GET-stream subscription (we don't consume server-initiated
* notifications).
*
* Limitations
*
* • No stdio reconnect: if a stdio subprocess dies mid-session, its tools
* become unavailable until pi is reloaded. The mempalace.ts extension
* has the same limitation today.
* • Remote sessions self-heal on 404: if a streamable-HTTP server
* forgets our session id (e.g. server restart), the next request gets
* HTTP 404; the client transparently re-initializes and retries the
* request once. Persistent 404s after refresh surface as errors.
* • No OAuth flow: remote servers requiring OAuth must be accessed with
* a pre-issued bearer token via `headers`.
*
* Coexistence with mempalace.ts
*
@@ -76,7 +92,7 @@
* Slash command
*
* `/mcp` lists configured MCP servers with their runtime status
* (running / failed / disabled / remote-skipped / invalid) and lets
* (running / failed / disabled / invalid) and lets
* you toggle the `enabled` flag in settings.json. Same UX as `/ext`:
* space stages, enter applies + reloads, esc cancels.
*/
@@ -116,6 +132,7 @@ type LocalServerConfig = {
type RemoteServerConfig = {
type: "remote";
url: string;
headers?: Record<string, string>;
enabled?: boolean;
};
@@ -124,6 +141,13 @@ type ServerConfig = LocalServerConfig | RemoteServerConfig;
const SETTINGS_PATH = path.join(os.homedir(), ".pi", "agent", "settings.json");
const DEBUG = process.env.PI_MCP_LOADER_DEBUG === "1";
// MCP protocol version we advertise. Servers we've tested:
// - searxng (mcp-searxng) accepts both 2024-11-05 and 2025-11-25
// - context7 accepts both
// - mempalace bridge: not touched by this loader
// Bump as the spec evolves; both stdio and remote clients use this.
const MCP_PROTOCOL_VERSION = "2025-11-25";
function dlog(msg: string) {
if (DEBUG) process.stderr.write(`[mcp-loader] ${msg}\n`);
}
@@ -162,21 +186,35 @@ function readMcpServers(): Record<string, ServerConfig> {
// ── Stdio MCP client ─────────────────────────────────────────────────────────
class StdioMcpClient {
interface IMcpClient {
readonly serverName: string;
tools: McpTool[];
start(): Promise<void>;
callTool(name: string, args: Record<string, unknown>): Promise<any>;
stop(): void | Promise<void>;
}
class StdioMcpClient implements IMcpClient {
private proc: ChildProcessWithoutNullStreams | null = null;
private nextId = 1;
private pending = new Map<number, { resolve: (v: any) => void; reject: (e: Error) => void }>();
private stdoutBuf = "";
private serverName: string;
public readonly serverName: string;
private command: string;
private args: string[];
private env: Record<string, string> | undefined;
public tools: McpTool[] = [];
constructor(serverName: string) {
constructor(serverName: string, command: string, args: string[], env?: Record<string, string>) {
this.serverName = serverName;
this.command = command;
this.args = args;
this.env = env;
}
async start(command: string, args: string[], env?: Record<string, string>): Promise<void> {
const childEnv = { ...process.env, ...(env ?? {}) };
this.proc = spawn(command, args, { stdio: ["pipe", "pipe", "pipe"], env: childEnv });
async start(): Promise<void> {
const childEnv = { ...process.env, ...(this.env ?? {}) };
this.proc = spawn(this.command, this.args, { stdio: ["pipe", "pipe", "pipe"], env: childEnv });
this.proc.on("error", (err) => this.failAll(err));
this.proc.on("exit", (code, signal) => {
@@ -197,9 +235,9 @@ class StdioMcpClient {
// MCP initialize handshake
await this.request("initialize", {
protocolVersion: "2024-11-05",
protocolVersion: MCP_PROTOCOL_VERSION,
capabilities: {},
clientInfo: { name: "pi-mcp-loader", version: "0.1.0" },
clientInfo: { name: "pi-mcp-loader", version: "0.2.0" },
});
this.notify("notifications/initialized", {});
@@ -266,6 +304,195 @@ class StdioMcpClient {
}
}
// ── Streamable-HTTP MCP client (remote) ──────────────────────────────────────
//
// Per MCP spec 2025-03-26: a single endpoint URL accepts POST with a
// JSON-RPC body. The server responds either with application/json (a
// single response) or text/event-stream (an SSE stream containing the
// response, possibly preceded by notifications, terminated by stream
// close). We do NOT open a separate GET subscription stream — we don't
// consume server-initiated notifications today.
//
// Session id: if the server returns an Mcp-Session-Id header on
// initialize, we round-trip it on every subsequent request and DELETE
// it on stop().
class RemoteMcpClient implements IMcpClient {
public readonly serverName: string;
private url: string;
private extraHeaders: Record<string, string>;
private sessionId: string | null = null;
private nextId = 1;
public tools: McpTool[] = [];
constructor(serverName: string, url: string, headers?: Record<string, string>) {
this.serverName = serverName;
this.url = url;
this.extraHeaders = headers ?? {};
}
async start(): Promise<void> {
await this.request("initialize", {
protocolVersion: MCP_PROTOCOL_VERSION,
capabilities: {},
clientInfo: { name: "pi-mcp-loader", version: "0.2.0" },
});
await this.notify("notifications/initialized", {});
const listed = await this.request("tools/list", {});
this.tools = (listed?.tools as McpTool[]) ?? [];
}
async callTool(name: string, args: Record<string, unknown>): Promise<any> {
return this.request("tools/call", { name, arguments: args });
}
async stop(): Promise<void> {
if (!this.sessionId) return;
try {
await fetch(this.url, { method: "DELETE", headers: this.buildHeaders() });
} catch {}
}
private buildHeaders(): Record<string, string> {
const h: Record<string, string> = {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
"MCP-Protocol-Version": MCP_PROTOCOL_VERSION,
...this.extraHeaders,
};
if (this.sessionId) h["Mcp-Session-Id"] = this.sessionId;
return h;
}
/**
* Re-establish a session after the server returned 404 to a request that
* carried our `Mcp-Session-Id`. Per spec 2025-11-25, the client MUST start
* a new session by sending a fresh InitializeRequest without a session id.
*
* Caller must clear `this.sessionId` BEFORE invoking this so the new
* initialize POST goes out without the stale id.
*/
private async reinitialize(): Promise<void> {
dlog(`${this.serverName}: session lost (404) — reinitializing`);
await this.request("initialize", {
protocolVersion: MCP_PROTOCOL_VERSION,
capabilities: {},
clientInfo: { name: "pi-mcp-loader", version: "0.2.0" },
}, /*allowReinitOn404=*/ false);
await this.notify("notifications/initialized", {});
}
private async request(method: string, params: unknown, allowReinitOn404 = true): Promise<any> {
const id = this.nextId++;
const body = JSON.stringify({ jsonrpc: "2.0", id, method, params });
const sessionAtStart = this.sessionId;
let res: Response;
try {
res = await fetch(this.url, { method: "POST", headers: this.buildHeaders(), body });
} catch (err) {
throw new Error(`${this.serverName}: fetch failed on ${method}: ${(err as Error).message}`);
}
const sid = res.headers.get("Mcp-Session-Id") ?? res.headers.get("mcp-session-id");
if (sid) this.sessionId = sid;
// 404 + we sent a session id → server forgot us. Drop our id, re-init,
// retry once. `allowReinitOn404=false` is passed by the recovery path to
// prevent infinite recursion if the server is permanently broken.
if (res.status === 404 && sessionAtStart && allowReinitOn404) {
try { await res.arrayBuffer(); } catch {}
this.sessionId = null;
await this.reinitialize();
return this.request(method, params, /*allowReinitOn404=*/ false);
}
if (!res.ok) {
let detail = "";
try { detail = (await res.text()).slice(0, 200); } catch {}
throw new Error(`${this.serverName}: HTTP ${res.status} on ${method}${detail ? `: ${detail}` : ""}`);
}
const ct = (res.headers.get("content-type") ?? "").toLowerCase();
if (ct.includes("text/event-stream")) {
return await this.readSseForResponse(res, id);
}
if (ct.includes("application/json")) {
const json: any = await res.json();
if (json.error) throw new Error(json.error.message ?? "MCP error");
return json.result;
}
if (res.status === 202) return undefined;
throw new Error(`${this.serverName}: unexpected content-type "${ct}" on ${method}`);
}
private async notify(method: string, params: unknown): Promise<void> {
const body = JSON.stringify({ jsonrpc: "2.0", method, params });
let res: Response;
try {
res = await fetch(this.url, { method: "POST", headers: this.buildHeaders(), body });
} catch (err) {
throw new Error(`${this.serverName}: fetch failed on notify ${method}: ${(err as Error).message}`);
}
try { await res.arrayBuffer(); } catch {}
if (!res.ok && res.status !== 202) {
throw new Error(`${this.serverName}: HTTP ${res.status} on notify ${method}`);
}
}
private async readSseForResponse(res: Response, expectedId: number): Promise<any> {
if (!res.body) throw new Error(`${this.serverName}: SSE response had no body`);
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = "";
let dataLines: string[] = [];
const tryDispatch = (): { matched: boolean; result?: any } => {
if (dataLines.length === 0) return { matched: false };
const data = dataLines.join("\n");
dataLines = [];
let msg: any;
try {
msg = JSON.parse(data);
} catch {
return { matched: false };
}
if (typeof msg.id === "number" && msg.id === expectedId) {
if (msg.error) throw new Error(msg.error.message ?? "MCP error");
return { matched: true, result: msg.result };
}
return { matched: false };
};
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let nl: number;
while ((nl = buf.indexOf("\n")) !== -1) {
const rawLine = buf.slice(0, nl);
buf = buf.slice(nl + 1);
const line = rawLine.endsWith("\r") ? rawLine.slice(0, -1) : rawLine;
if (line === "") {
const out = tryDispatch();
if (out.matched) return out.result;
} else if (line.startsWith(":")) {
// SSE comment — ignore (often a keepalive)
} else if (line.startsWith("data:")) {
dataLines.push(line.slice(5).replace(/^ /, ""));
}
// event:, id:, retry: — ignored; only `data` matters for our use
}
}
const out = tryDispatch();
if (out.matched) return out.result;
} finally {
try { reader.cancel(); } catch {}
}
throw new Error(`${this.serverName}: SSE stream closed without response for id=${expectedId}`);
}
}
// ── MCP result → pi tool result helpers ──────────────────────────────────────
function extractText(mcpResult: any): string {
@@ -284,8 +511,20 @@ function namespacedToolName(serverName: string, toolName: string): string {
// Avoid double-prefix: if the MCP server already names its tools
// `<serverName>_<...>`, leave them alone.
const prefix = `${serverName}_`;
if (toolName.startsWith(prefix)) return toolName;
return `${prefix}${toolName}`;
const raw = toolName.startsWith(prefix) ? toolName : `${prefix}${toolName}`;
// Sanitize for the strictest tool-name regex we have to satisfy.
// Anthropic's Messages API allows `^[a-zA-Z0-9_-]{1,64}$` (hyphens OK),
// but AWS Bedrock's Anthropic shim rejects names containing `-` outright
// — it accepts `^[a-zA-Z][a-zA-Z0-9_]{0,63}$`. Hyphenated MCP tool names
// (context7's `resolve-library-id`, `query-docs`) caused entire turns to
// 4xx silently on Bedrock, manifesting as no output at all once the
// server was enabled. Replace any non-[A-Za-z0-9_] with `_`, then prefix
// a leading underscore with `t` if the result starts with a non-letter.
// Truncate to 64 to also satisfy Anthropic's length cap.
let sanitized = raw.replace(/[^A-Za-z0-9_]/g, "_");
if (!/^[A-Za-z]/.test(sanitized)) sanitized = `t_${sanitized}`;
if (sanitized.length > 64) sanitized = sanitized.slice(0, 64);
return sanitized;
}
// ── Settings.json writer ─────────────────────────────────────────────────────
@@ -316,7 +555,6 @@ type ServerRuntimeState =
| { kind: "running"; toolCount: number }
| { kind: "failed"; message: string }
| { kind: "disabled" }
| { kind: "remote-skipped" }
| { kind: "invalid"; message: string };
// ── Extension entry ──────────────────────────────────────────────────────────
@@ -324,7 +562,7 @@ type ServerRuntimeState =
export default async function mcpLoaderExtension(pi: ExtensionAPI) {
const servers = readMcpServers();
const runtime = new Map<string, ServerRuntimeState>();
const clients: StdioMcpClient[] = [];
const clients: IMcpClient[] = [];
// Always register /mcp — even when no servers are configured, so users
// discover the surface. Handler reads settings.json fresh on each invoke.
@@ -342,30 +580,33 @@ export default async function mcpLoaderExtension(pi: ExtensionAPI) {
continue;
}
let client: IMcpClient;
if (cfg.type === "remote") {
warn(
`${name}: remote MCP transport not yet supported by mcp-loader (v1 is stdio-only); skipping`,
);
runtime.set(name, { kind: "remote-skipped" });
continue;
const remote = cfg as RemoteServerConfig;
if (typeof remote.url !== "string" || remote.url.length === 0) {
warn(`${name}: remote server missing \`url\`, skipping`);
runtime.set(name, { kind: "invalid", message: "remote server missing url" });
continue;
}
client = new RemoteMcpClient(name, remote.url, remote.headers);
} else {
const local = cfg as LocalServerConfig;
if (!Array.isArray(local.command) || local.command.length === 0) {
warn(`${name}: invalid \`command\` (must be non-empty array), skipping`);
runtime.set(name, { kind: "invalid", message: "command must be non-empty array" });
continue;
}
const [bin, ...args] = local.command;
client = new StdioMcpClient(name, bin, args, local.env);
}
const local = cfg as LocalServerConfig;
if (!Array.isArray(local.command) || local.command.length === 0) {
warn(`${name}: invalid \`command\` (must be non-empty array), skipping`);
runtime.set(name, { kind: "invalid", message: "command must be non-empty array" });
continue;
}
const [bin, ...args] = local.command;
const client = new StdioMcpClient(name);
try {
await client.start(bin, args, local.env);
await client.start();
} catch (err) {
const message = (err as Error).message;
warn(`${name}: failed to start — ${message}`);
runtime.set(name, { kind: "failed", message });
client.stop();
try { await client.stop(); } catch {}
continue;
}
@@ -413,9 +654,9 @@ export default async function mcpLoaderExtension(pi: ExtensionAPI) {
}
}
// Tear down subprocesses on session shutdown so reloads don't leak procs.
// Tear down clients (stdio subprocesses, remote sessions) on session shutdown.
pi.on("session_shutdown", async () => {
for (const c of clients) c.stop();
await Promise.allSettled(clients.map((c) => Promise.resolve(c.stop())));
});
}
@@ -434,8 +675,6 @@ function statusLabel(state: ServerRuntimeState | undefined, configEnabled: boole
return `failed: ${state.message}`;
case "disabled":
return "disabled in settings";
case "remote-skipped":
return "remote (skipped — v1 stdio only)";
case "invalid":
return `invalid: ${state.message}`;
}