Files
mempalace-toolkit/extensions/pi/mempalace.ts
T
joakimp a3b8829991 feat(pi-ext): per-request timeout + stall-kill for mempalace-mcp
A wedged mempalace-mcp (classically an OrbStack virtiofs cold-open of a
large chroma.sqlite3 / HNSW load) left the awaiting JSON-RPC promise
pending forever, freezing the pi TUI uninterruptibly: ESC cancels the
LLM stream, not a pending tool execute().

The JSON-RPC client now arms a per-request timer. On expiry it rejects
the request AND kills the stalled child (SIGTERM->SIGKILL), so pi gets
an error instead of hanging; the extension flips available=false so
later calls fail fast (restart pi to retry). Per-REQUEST, not
per-process: the long-lived server only dies on a genuine stall.

Knobs: MEMPALACE_MCP_TIMEOUT_MS (default 60000),
MEMPALACE_MCP_INIT_TIMEOUT_MS (default 120000), 0 = disable.

This supersedes the planned standalone stdio-watchdog shim: the
extension already owns request/response correlation, so a separate
framing-reparsing shim is unnecessary.
2026-06-13 23:48:33 +02:00

386 lines
13 KiB
TypeScript

/**
* MemPalace ↔ pi bridge.
*
* Spawns the `mempalace-mcp` MCP stdio server as a subprocess, performs the
* MCP `initialize` handshake, lists available tools, and registers each
* one as a pi tool that proxies to `tools/call`.
*
* Lifecycle automation (per ~/.agents/skills/mempalace/SKILL.md):
* - Wake-up (auto): on first user prompt of a fresh session, inject
* `mempalace_status` + `mempalace_diary_read` output as context so the
* agent orients itself the way the mempalace skill describes. Skipped
* on resume/fork (palace context is already in the thread).
* - Wind-down (manual): `/mempalace-diary` command prompts the LLM to
* write an AAAK-formatted diary entry. Not fully auto because pi
* sessions are typically short/tactical and session_shutdown is too
* late to drive an LLM turn.
*
* Identity: `agent_name` for diary calls comes from $MEMPALACE_AGENT_NAME,
* defaulting to "pi". First diary write creates `wing_pi`.
*
* Fail-soft: if the MCP subprocess can't start, pi keeps working without
* palace tools (warning on stderr only).
*
* Stall protection: every JSON-RPC request carries a timeout. If
* `mempalace-mcp` wedges (e.g. OrbStack virtiofs cold-open of a large
* chroma.sqlite3 / HNSW load), the awaiting promise would otherwise hang
* forever and freeze the pi TUI (ESC cancels the LLM stream, not a pending
* tool execution). On timeout we reject the request AND kill the wedged
* child, so pi gets an error instead of hanging and later calls fail fast.
* This is a per-REQUEST timeout, not a process-lifetime one — the
* long-lived server is only killed when a request genuinely stalls.
* - MEMPALACE_MCP_TIMEOUT_MS tool-call/request timeout (default 60000)
* - MEMPALACE_MCP_INIT_TIMEOUT_MS initialize+tools/list timeout (default 120000)
* Set either to 0 to disable (legacy unbounded behavior).
*
* Debug: set MEMPALACE_EXT_DEBUG=1 to surface mempalace-mcp stderr.
*/
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
import { Type } from "typebox";
// Minimal MCP stdio JSON-RPC client. MCP uses newline-delimited JSON.
interface McpTool {
name: string;
description?: string;
inputSchema?: unknown;
}
interface Pending {
resolve: (v: any) => void;
reject: (e: Error) => void;
timer: ReturnType<typeof setTimeout> | null;
}
const num = (envVal: string | undefined, fallback: number): number => {
const n = envVal !== undefined ? Number(envVal) : Number.NaN;
return Number.isFinite(n) && n >= 0 ? n : fallback;
};
class McpClient {
private proc: ChildProcessWithoutNullStreams | null = null;
private nextId = 1;
private pending = new Map<number, Pending>();
private stdoutBuf = "";
private ready: Promise<void> | null = null;
public tools: McpTool[] = [];
// Per-request timeouts (ms). 0 = disabled (unbounded, legacy behavior).
private requestTimeoutMs = num(process.env.MEMPALACE_MCP_TIMEOUT_MS, 60_000);
private initTimeoutMs = num(process.env.MEMPALACE_MCP_INIT_TIMEOUT_MS, 120_000);
// Fired when the child process dies (exit or stall-kill). Lets the
// extension flip `available` so later tool calls fail fast.
public onExit: (() => void) | null = null;
async start(command: string, args: string[] = []): Promise<void> {
if (this.ready) return this.ready;
this.ready = (async () => {
this.proc = spawn(command, args, { stdio: ["pipe", "pipe", "pipe"] });
this.proc.on("error", (err) => this.handleDeath(err));
this.proc.on("exit", (code) =>
this.handleDeath(new Error(`mempalace-mcp exited (code=${code})`)),
);
this.proc.stdout.setEncoding("utf8");
this.proc.stdout.on("data", (chunk: string) => this.onStdout(chunk));
// Drain stderr silently. Re-enable by setting MEMPALACE_EXT_DEBUG=1.
this.proc.stderr.setEncoding("utf8");
if (process.env.MEMPALACE_EXT_DEBUG) {
this.proc.stderr.on("data", (chunk: string) => {
process.stderr.write(`[mempalace-mcp stderr] ${chunk}`);
});
} else {
this.proc.stderr.resume(); // drain without logging
}
// MCP initialize handshake. Cold-open over virtiofs can be slow, so
// use the (longer) init timeout here rather than the per-call one.
await this.request(
"initialize",
{
protocolVersion: "2024-11-05",
capabilities: {},
clientInfo: { name: "pi-mempalace-ext", version: "0.1.0" },
},
this.initTimeoutMs,
);
this.notify("notifications/initialized", {});
const listed = await this.request("tools/list", {}, this.initTimeoutMs);
this.tools = (listed?.tools as McpTool[]) ?? [];
})();
return this.ready;
}
private onStdout(chunk: string) {
this.stdoutBuf += chunk;
let nl: number;
while ((nl = this.stdoutBuf.indexOf("\n")) !== -1) {
const line = this.stdoutBuf.slice(0, nl).trim();
this.stdoutBuf = this.stdoutBuf.slice(nl + 1);
if (!line) continue;
let msg: any;
try {
msg = JSON.parse(line);
} catch {
continue;
}
if (typeof msg.id === "number" && this.pending.has(msg.id)) {
const p = this.pending.get(msg.id)!;
this.settle(msg.id);
if (msg.error) p.reject(new Error(msg.error.message ?? "MCP error"));
else p.resolve(msg.result);
}
// notifications (no id) ignored for now.
}
}
/** Remove a pending request and clear its timeout timer. */
private settle(id: number) {
const p = this.pending.get(id);
if (p?.timer) clearTimeout(p.timer);
this.pending.delete(id);
}
private failAll(err: Error) {
for (const id of [...this.pending.keys()]) {
const p = this.pending.get(id)!;
this.settle(id);
p.reject(err);
}
}
/** Child died (exit, spawn error, or stall-kill): reject everything. */
private handleDeath(err: Error) {
this.proc = null;
this.failAll(err);
try {
this.onExit?.();
} catch {}
}
private write(obj: unknown) {
if (!this.proc) throw new Error("MCP process not started");
this.proc.stdin.write(`${JSON.stringify(obj)}\n`);
}
private notify(method: string, params: unknown) {
this.write({ jsonrpc: "2.0", method, params });
}
request(method: string, params: unknown, timeoutMs = this.requestTimeoutMs): Promise<any> {
const id = this.nextId++;
return new Promise((resolve, reject) => {
let timer: ReturnType<typeof setTimeout> | null = null;
if (timeoutMs > 0) {
timer = setTimeout(() => {
if (!this.pending.has(id)) return;
this.settle(id);
reject(
new Error(
`mempalace-mcp request '${method}' timed out after ${timeoutMs}ms ` +
`(server wedged — likely cold storage open). Terminating the ` +
`stalled server; restart pi to retry palace tools.`,
),
);
// Kill the wedged child so subsequent calls fail fast instead of
// stacking up behind a dead server. The 'exit' handler
// (handleDeath) rejects any other pending requests.
this.kill();
}, timeoutMs);
// Don't let a pending MCP timer keep the event loop alive.
if (typeof timer.unref === "function") timer.unref();
}
this.pending.set(id, { resolve, reject, timer });
try {
this.write({ jsonrpc: "2.0", id, method, params });
} catch (err) {
this.settle(id);
reject(err as Error);
}
});
}
async callTool(name: string, args: Record<string, unknown>): Promise<any> {
return this.request("tools/call", { name, arguments: args });
}
/** SIGTERM then SIGKILL grace, for stall recovery. */
private kill() {
const proc = this.proc;
if (!proc) return;
try {
proc.kill("SIGTERM");
} catch {}
const t = setTimeout(() => {
try {
proc.kill("SIGKILL");
} catch {}
}, 2_000);
if (typeof t.unref === "function") t.unref();
}
stop() {
if (this.proc) {
try {
this.proc.kill("SIGTERM");
} catch {}
this.proc = null;
}
}
}
export default async function mempalaceExtension(pi: ExtensionAPI) {
const client = new McpClient();
let available = false;
const agentName = process.env.MEMPALACE_AGENT_NAME ?? "pi";
// Gate: inject wake-up context only on the first before_agent_start of a
// fresh session. Set true on resume/fork (context already in thread).
let wokeUp = false;
try {
client.onExit = () => {
// Child died (stall-kill or crash): later tool calls return a clear
// error instead of trying to talk to a dead/respawn-less server.
available = false;
};
await client.start("mempalace-mcp");
available = true;
} catch (err) {
process.stderr.write(
`[mempalace ext] failed to start mempalace-mcp: ${(err as Error).message}\n`,
);
return; // fail-soft: pi keeps working without palace tools
}
// Register MCP tools as pi tools. Pass the MCP `inputSchema` through as
// the pi `parameters` schema so the LLM sees the real parameter names
// (e.g. `agent_name`, not guessed `agent`). TypeBox schemas are plain
// JSON Schema at runtime, so `Type.Unsafe` is sufficient to wrap an
// externally-sourced JSON Schema — no conversion needed.
for (const tool of client.tools) {
const schema =
tool.inputSchema && typeof tool.inputSchema === "object"
? (Type.Unsafe<Record<string, unknown>>(tool.inputSchema as object) as unknown as ReturnType<typeof Type.Object>)
: Type.Object({}, { additionalProperties: true });
pi.registerTool({
name: tool.name,
label: tool.name,
description: tool.description ?? `MemPalace tool: ${tool.name}`,
parameters: schema,
async execute(_toolCallId, params) {
if (!available) {
return {
content: [{ type: "text", text: "mempalace-mcp not available" }],
details: {},
isError: true,
};
}
try {
const result = await client.callTool(tool.name, (params ?? {}) as Record<string, unknown>);
// MCP tool results use { content: [...], isError?: boolean }
return {
content: result?.content ?? [{ type: "text", text: JSON.stringify(result) }],
details: { raw: result },
isError: result?.isError === true,
};
} catch (err) {
return {
content: [{ type: "text", text: `MCP call failed: ${(err as Error).message}` }],
details: {},
isError: true,
};
}
},
});
}
pi.on("session_shutdown", async () => {
client.stop();
});
pi.on("session_start", async (event, ctx) => {
// On resume/fork, the previous session's palace context is already in
// the thread — skip the wake-up injection.
if (event.reason === "resume" || event.reason === "fork") {
wokeUp = true;
}
ctx.ui.notify(
`mempalace bridge: ${client.tools.length} tools registered (agent=${agentName})`,
"info",
);
});
// --- Auto wake-up (mempalace skill Phase 1) ---
pi.on("before_agent_start", async (_event, _ctx) => {
if (wokeUp || !available) return;
wokeUp = true; // one-shot, even if the calls below fail
const sections: string[] = [];
try {
const status = await client.callTool("mempalace_status", {});
const text = extractText(status);
if (text) sections.push(`## mempalace_status\n\n${text}`);
} catch (err) {
sections.push(`## mempalace_status\n\n(error: ${(err as Error).message})`);
}
try {
const diary = await client.callTool("mempalace_diary_read", {
agent_name: agentName,
last_n: 5,
});
const text = extractText(diary);
if (text) sections.push(`## mempalace_diary_read (agent=${agentName}, last_n=5)\n\n${text}`);
} catch (err) {
sections.push(`## mempalace_diary_read\n\n(error: ${(err as Error).message})`);
}
if (sections.length === 0) return;
const body =
`MemPalace wake-up context (auto-injected by the mempalace extension). ` +
`This is your palace orientation for this session — do not announce it to the user, ` +
`just use it to inform your answers. Agent identity for diary tools: "${agentName}".\n\n` +
sections.join("\n\n---\n\n");
return {
message: {
customType: "mempalace-wakeup",
content: body,
display: true,
},
};
});
// --- Manual wind-down (mempalace skill Phase 3) ---
pi.registerCommand("mempalace-diary", {
description: "Ask the LLM to write an AAAK diary entry summarizing this session",
handler: async (args, ctx) => {
if (!available) {
ctx.ui.notify("mempalace bridge not available", "warning");
return;
}
const topic = args.trim() || "session-summary";
const prompt =
`Write a MemPalace diary entry for this session using the AAAK format ` +
`described in the mempalace skill. Call mempalace_diary_write with ` +
`agent_name="${agentName}", topic="${topic}", and a compressed AAAK entry ` +
`that summarizes what we worked on, what was discovered, and any open ` +
`threads. Then confirm the write succeeded. Do not ask me for ` +
`clarification — draft from the session so far.`;
pi.sendUserMessage(prompt);
},
});
}
/** Flatten MCP tool result content into plain text for context injection. */
function extractText(mcpResult: any): string {
const parts = mcpResult?.content;
if (!Array.isArray(parts)) return typeof mcpResult === "string" ? mcpResult : "";
return parts
.map((p: any) => (typeof p?.text === "string" ? p.text : ""))
.filter(Boolean)
.join("\n");
}