e12b624cf7
A stall-kill (or any crash) of mempalace-mcp was a permanent latch: available flipped off and stayed off until pi restart. Now the next tool call transparently respawns the server and retries. - ensureAlive(): bounded respawn with capped exponential backoff (MEMPALACE_MCP_MAX_RESPAWNS, default 2; MEMPALACE_MCP_RESPAWN_BACKOFF_MS, default 1000). Respawn budget resets on any successful JSON-RPC response, so a recovered server regains full patience while a persistently-broken one hits the cap and stays down (no hot-loop). - Init timeout default raised 120000 -> 300000 (scoped to init only): a genuine virtiofs cold-open shouldn't be killed mid-progress only to respawn and re-pay the same cost. Per-call timeout stays 60000. - Concurrency hardening: generation counter so a late exit from a killed old process can't tear down a fresh respawn; explicit healthy flag replaces racy proc!=null liveness check. - README: document self-heal, new env vars, and why generous-init + bounded-respawn compose rather than overlap.
502 lines
18 KiB
TypeScript
502 lines
18 KiB
TypeScript
/**
|
|
* MemPalace ↔ pi bridge.
|
|
*
|
|
* Spawns the `mempalace-mcp` MCP stdio server as a subprocess, performs the
|
|
* MCP `initialize` handshake, lists available tools, and registers each
|
|
* one as a pi tool that proxies to `tools/call`.
|
|
*
|
|
* Lifecycle automation (per ~/.agents/skills/mempalace/SKILL.md):
|
|
* - Wake-up (auto): on first user prompt of a fresh session, inject
|
|
* `mempalace_status` + `mempalace_diary_read` output as context so the
|
|
* agent orients itself the way the mempalace skill describes. Skipped
|
|
* on resume/fork (palace context is already in the thread).
|
|
* - Wind-down (manual): `/mempalace-diary` command prompts the LLM to
|
|
* write an AAAK-formatted diary entry. Not fully auto because pi
|
|
* sessions are typically short/tactical and session_shutdown is too
|
|
* late to drive an LLM turn.
|
|
*
|
|
* Identity: `agent_name` for diary calls comes from $MEMPALACE_AGENT_NAME,
|
|
* defaulting to "pi". First diary write creates `wing_pi`.
|
|
*
|
|
* Fail-soft: if the MCP subprocess can't start, pi keeps working without
|
|
* palace tools (warning on stderr only).
|
|
*
|
|
* Stall protection: every JSON-RPC request carries a timeout. If
|
|
* `mempalace-mcp` wedges (e.g. OrbStack virtiofs cold-open of a large
|
|
* chroma.sqlite3 / HNSW load), the awaiting promise would otherwise hang
|
|
* forever and freeze the pi TUI (ESC cancels the LLM stream, not a pending
|
|
* tool execution). On timeout we reject the request AND kill the wedged
|
|
* child, so pi gets an error instead of hanging and later calls fail fast.
|
|
* This is a per-REQUEST timeout, not a process-lifetime one — the
|
|
* long-lived server is only killed when a request genuinely stalls.
|
|
* - MEMPALACE_MCP_TIMEOUT_MS tool-call/request timeout (default 60000)
|
|
* - MEMPALACE_MCP_INIT_TIMEOUT_MS initialize+tools/list timeout (default 300000)
|
|
* Set either to 0 to disable (legacy unbounded behavior).
|
|
*
|
|
* Self-heal (respawn): a stall-kill (or any crash) is no longer a permanent
|
|
* latch. The next tool call transparently respawns `mempalace-mcp` and
|
|
* retries, with capped exponential backoff so a persistently-broken server
|
|
* can't hot-loop. The respawn budget resets on ANY successful JSON-RPC
|
|
* response (proof the server is actually live), so a recovered server
|
|
* regains full patience. The two timeouts above are deliberately split:
|
|
* the long INIT timeout lets a genuine first cold-open finish without being
|
|
* killed (the original incident), while the short per-call timeout still
|
|
* aggressively kills a stuck query. After a server has opened the palace
|
|
* once, the OS page cache is warm, so respawn cold-opens are fast — which
|
|
* is exactly why a generous INIT timeout + bounded respawn compose well
|
|
* instead of overlapping.
|
|
* - MEMPALACE_MCP_MAX_RESPAWNS respawn attempts before giving up (default 2; 0 disables self-heal)
|
|
* - MEMPALACE_MCP_RESPAWN_BACKOFF_MS base backoff, doubled per attempt (default 1000)
|
|
*
|
|
* Debug: set MEMPALACE_EXT_DEBUG=1 to surface mempalace-mcp stderr.
|
|
*/
|
|
|
|
import { type ChildProcessWithoutNullStreams, spawn } from "node:child_process";
|
|
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
|
|
import { Type } from "typebox";
|
|
|
|
// Minimal MCP stdio JSON-RPC client. MCP uses newline-delimited JSON.
|
|
interface McpTool {
|
|
name: string;
|
|
description?: string;
|
|
inputSchema?: unknown;
|
|
}
|
|
|
|
interface Pending {
|
|
resolve: (v: any) => void;
|
|
reject: (e: Error) => void;
|
|
timer: ReturnType<typeof setTimeout> | null;
|
|
}
|
|
|
|
const num = (envVal: string | undefined, fallback: number): number => {
|
|
const n = envVal !== undefined ? Number(envVal) : Number.NaN;
|
|
return Number.isFinite(n) && n >= 0 ? n : fallback;
|
|
};
|
|
|
|
const sleep = (ms: number): Promise<void> =>
|
|
new Promise((res) => {
|
|
const t = setTimeout(res, ms);
|
|
if (typeof t.unref === "function") t.unref();
|
|
});
|
|
|
|
class McpClient {
|
|
private proc: ChildProcessWithoutNullStreams | null = null;
|
|
private nextId = 1;
|
|
private pending = new Map<number, Pending>();
|
|
private stdoutBuf = "";
|
|
private ready: Promise<void> | null = null;
|
|
public tools: McpTool[] = [];
|
|
|
|
// Per-request timeouts (ms). 0 = disabled (unbounded, legacy behavior).
|
|
private requestTimeoutMs = num(process.env.MEMPALACE_MCP_TIMEOUT_MS, 60_000);
|
|
private initTimeoutMs = num(process.env.MEMPALACE_MCP_INIT_TIMEOUT_MS, 300_000);
|
|
// Self-heal (respawn) controls.
|
|
private maxRespawns = num(process.env.MEMPALACE_MCP_MAX_RESPAWNS, 2);
|
|
private respawnBackoffMs = num(process.env.MEMPALACE_MCP_RESPAWN_BACKOFF_MS, 1_000);
|
|
private respawns = 0; // consecutive respawn attempts; reset on any success
|
|
private reviving: Promise<boolean> | null = null;
|
|
// Liveness: true only between a completed init and the matching death.
|
|
// Inferring from `proc` is racy (non-null during the SIGTERM→exit window).
|
|
private healthy = false;
|
|
// Spawn generation. Each (re)spawn bumps it; death handlers carry the gen
|
|
// they were attached for and no-op if a newer server has since taken over
|
|
// (prevents a stale OLD-proc 'exit' from clobbering a freshly respawned one).
|
|
private gen = 0;
|
|
// Spawn args remembered so a respawn can reuse them.
|
|
private command = "mempalace-mcp";
|
|
private args: string[] = [];
|
|
// Fired when the child process dies (exit or stall-kill). Lets the
|
|
// extension flip `available` so later tool calls fail fast.
|
|
public onExit: (() => void) | null = null;
|
|
|
|
/** True only when a server has completed init and not since died. */
|
|
get alive(): boolean {
|
|
return this.healthy;
|
|
}
|
|
|
|
async start(command: string, args: string[] = []): Promise<void> {
|
|
if (this.ready) return this.ready;
|
|
this.command = command;
|
|
this.args = args;
|
|
this.ready = (async () => {
|
|
const myGen = ++this.gen;
|
|
const child = spawn(command, args, { stdio: ["pipe", "pipe", "pipe"] });
|
|
this.proc = child;
|
|
child.on("error", (err) => this.handleDeath(myGen, err));
|
|
child.on("exit", (code) =>
|
|
this.handleDeath(myGen, new Error(`mempalace-mcp exited (code=${code})`)),
|
|
);
|
|
this.proc.stdout.setEncoding("utf8");
|
|
this.proc.stdout.on("data", (chunk: string) => this.onStdout(chunk));
|
|
// Drain stderr silently. Re-enable by setting MEMPALACE_EXT_DEBUG=1.
|
|
this.proc.stderr.setEncoding("utf8");
|
|
if (process.env.MEMPALACE_EXT_DEBUG) {
|
|
this.proc.stderr.on("data", (chunk: string) => {
|
|
process.stderr.write(`[mempalace-mcp stderr] ${chunk}`);
|
|
});
|
|
} else {
|
|
this.proc.stderr.resume(); // drain without logging
|
|
}
|
|
|
|
// MCP initialize handshake. Cold-open over virtiofs can be slow, so
|
|
// use the (longer) init timeout here rather than the per-call one.
|
|
await this.request(
|
|
"initialize",
|
|
{
|
|
protocolVersion: "2024-11-05",
|
|
capabilities: {},
|
|
clientInfo: { name: "pi-mempalace-ext", version: "0.1.0" },
|
|
},
|
|
this.initTimeoutMs,
|
|
);
|
|
this.notify("notifications/initialized", {});
|
|
|
|
const listed = await this.request("tools/list", {}, this.initTimeoutMs);
|
|
this.tools = (listed?.tools as McpTool[]) ?? [];
|
|
// Init complete and the process is still ours — mark live so ensureAlive()
|
|
// reports true. Guard against a death that raced in during init.
|
|
if (myGen === this.gen) this.healthy = true;
|
|
})();
|
|
return this.ready;
|
|
}
|
|
|
|
private onStdout(chunk: string) {
|
|
this.stdoutBuf += chunk;
|
|
let nl: number;
|
|
while ((nl = this.stdoutBuf.indexOf("\n")) !== -1) {
|
|
const line = this.stdoutBuf.slice(0, nl).trim();
|
|
this.stdoutBuf = this.stdoutBuf.slice(nl + 1);
|
|
if (!line) continue;
|
|
let msg: any;
|
|
try {
|
|
msg = JSON.parse(line);
|
|
} catch {
|
|
continue;
|
|
}
|
|
if (typeof msg.id === "number" && this.pending.has(msg.id)) {
|
|
const p = this.pending.get(msg.id)!;
|
|
this.settle(msg.id);
|
|
if (msg.error) p.reject(new Error(msg.error.message ?? "MCP error"));
|
|
else {
|
|
// A successful response proves the server is live — restore the
|
|
// full respawn budget for any future (unrelated) stall.
|
|
this.respawns = 0;
|
|
p.resolve(msg.result);
|
|
}
|
|
}
|
|
// notifications (no id) ignored for now.
|
|
}
|
|
}
|
|
|
|
/** Remove a pending request and clear its timeout timer. */
|
|
private settle(id: number) {
|
|
const p = this.pending.get(id);
|
|
if (p?.timer) clearTimeout(p.timer);
|
|
this.pending.delete(id);
|
|
}
|
|
|
|
private failAll(err: Error) {
|
|
for (const id of [...this.pending.keys()]) {
|
|
const p = this.pending.get(id)!;
|
|
this.settle(id);
|
|
p.reject(err);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Child died (exit, spawn error, or stall-kill): reject everything.
|
|
* Ignored if a newer generation has already taken over — a late 'exit'
|
|
* from a killed old process must not tear down a fresh respawn.
|
|
*/
|
|
private handleDeath(gen: number, err: Error) {
|
|
if (gen !== this.gen) return; // stale handler from a superseded process
|
|
this.proc = null;
|
|
this.healthy = false;
|
|
// Clear the memoized start() promise so a subsequent start()/ensureAlive()
|
|
// spawns a fresh server instead of returning the dead one's promise.
|
|
this.ready = null;
|
|
this.failAll(err);
|
|
try {
|
|
this.onExit?.();
|
|
} catch {}
|
|
}
|
|
|
|
/**
|
|
* Ensure a live, initialized server — respawning a dead one with capped
|
|
* exponential backoff. Returns whether the server is alive afterwards.
|
|
* Concurrent callers share a single in-flight revive. The attempt counter
|
|
* is reset by `onStdout` on any successful response, so a server that comes
|
|
* back and works regains its full budget; a server that keeps dying hits
|
|
* `maxRespawns` and stays down (restart pi) rather than hot-looping.
|
|
*/
|
|
async ensureAlive(): Promise<boolean> {
|
|
if (this.alive) return true;
|
|
if (this.reviving) return this.reviving;
|
|
this.reviving = (async () => {
|
|
while (!this.alive && this.respawns < this.maxRespawns) {
|
|
this.respawns++;
|
|
await sleep(this.respawnBackoffMs * 2 ** (this.respawns - 1));
|
|
// Force a fresh spawn: drop any settled (rejected/dead) start()
|
|
// promise so start() doesn't short-circuit on a stale memo. Safe
|
|
// because ensureAlive is serialized (single `reviving`) and only
|
|
// runs while not healthy — no useful in-flight start() can exist.
|
|
this.ready = null;
|
|
try {
|
|
await this.start(this.command, this.args);
|
|
} catch {
|
|
// start() rejected (e.g. respawn cold-open also stalled and was
|
|
// killed). Loop until the budget is exhausted.
|
|
}
|
|
}
|
|
return this.alive;
|
|
})();
|
|
try {
|
|
return await this.reviving;
|
|
} finally {
|
|
this.reviving = null;
|
|
}
|
|
}
|
|
|
|
private write(obj: unknown) {
|
|
if (!this.proc) throw new Error("MCP process not started");
|
|
this.proc.stdin.write(`${JSON.stringify(obj)}\n`);
|
|
}
|
|
|
|
private notify(method: string, params: unknown) {
|
|
this.write({ jsonrpc: "2.0", method, params });
|
|
}
|
|
|
|
request(method: string, params: unknown, timeoutMs = this.requestTimeoutMs): Promise<any> {
|
|
const id = this.nextId++;
|
|
return new Promise((resolve, reject) => {
|
|
let timer: ReturnType<typeof setTimeout> | null = null;
|
|
if (timeoutMs > 0) {
|
|
timer = setTimeout(() => {
|
|
if (!this.pending.has(id)) return;
|
|
this.settle(id);
|
|
reject(
|
|
new Error(
|
|
`mempalace-mcp request '${method}' timed out after ${timeoutMs}ms ` +
|
|
`(server wedged — likely cold storage open). Terminating the ` +
|
|
`stalled server; it will be respawned on the next call.`,
|
|
),
|
|
);
|
|
// Kill the wedged child so subsequent calls fail fast instead of
|
|
// stacking up behind a dead server. The 'exit' handler
|
|
// (handleDeath) rejects any other pending requests.
|
|
this.kill();
|
|
}, timeoutMs);
|
|
// Don't let a pending MCP timer keep the event loop alive.
|
|
if (typeof timer.unref === "function") timer.unref();
|
|
}
|
|
this.pending.set(id, { resolve, reject, timer });
|
|
try {
|
|
this.write({ jsonrpc: "2.0", id, method, params });
|
|
} catch (err) {
|
|
this.settle(id);
|
|
reject(err as Error);
|
|
}
|
|
});
|
|
}
|
|
|
|
async callTool(name: string, args: Record<string, unknown>): Promise<any> {
|
|
return this.request("tools/call", { name, arguments: args });
|
|
}
|
|
|
|
/** SIGTERM then SIGKILL grace, for stall recovery. */
|
|
private kill() {
|
|
const proc = this.proc;
|
|
if (!proc) return;
|
|
try {
|
|
proc.kill("SIGTERM");
|
|
} catch {}
|
|
const t = setTimeout(() => {
|
|
try {
|
|
proc.kill("SIGKILL");
|
|
} catch {}
|
|
}, 2_000);
|
|
if (typeof t.unref === "function") t.unref();
|
|
}
|
|
|
|
stop() {
|
|
if (this.proc) {
|
|
try {
|
|
this.proc.kill("SIGTERM");
|
|
} catch {}
|
|
this.proc = null;
|
|
}
|
|
}
|
|
}
|
|
|
|
export default async function mempalaceExtension(pi: ExtensionAPI) {
|
|
const client = new McpClient();
|
|
let available = false;
|
|
const agentName = process.env.MEMPALACE_AGENT_NAME ?? "pi";
|
|
|
|
// Gate: inject wake-up context only on the first before_agent_start of a
|
|
// fresh session. Set true on resume/fork (context already in thread).
|
|
let wokeUp = false;
|
|
|
|
try {
|
|
client.onExit = () => {
|
|
// Child died (stall-kill or crash): mark unavailable. The next tool
|
|
// call will attempt a bounded respawn via client.ensureAlive().
|
|
available = false;
|
|
};
|
|
await client.start("mempalace-mcp");
|
|
available = true;
|
|
} catch (err) {
|
|
// First cold-open stalled/crashed. Give the bounded self-heal a chance
|
|
// before giving up entirely — a respawn often succeeds against a now-warm
|
|
// page cache. Only fail-soft if even that is exhausted.
|
|
process.stderr.write(
|
|
`[mempalace ext] mempalace-mcp start failed: ${(err as Error).message} — attempting respawn\n`,
|
|
);
|
|
available = await client.ensureAlive();
|
|
if (!available) {
|
|
process.stderr.write(
|
|
"[mempalace ext] mempalace-mcp unavailable after retries; continuing without palace tools\n",
|
|
);
|
|
return; // fail-soft: pi keeps working without palace tools
|
|
}
|
|
}
|
|
|
|
// Register MCP tools as pi tools. Pass the MCP `inputSchema` through as
|
|
// the pi `parameters` schema so the LLM sees the real parameter names
|
|
// (e.g. `agent_name`, not guessed `agent`). TypeBox schemas are plain
|
|
// JSON Schema at runtime, so `Type.Unsafe` is sufficient to wrap an
|
|
// externally-sourced JSON Schema — no conversion needed.
|
|
for (const tool of client.tools) {
|
|
const schema =
|
|
tool.inputSchema && typeof tool.inputSchema === "object"
|
|
? (Type.Unsafe<Record<string, unknown>>(tool.inputSchema as object) as unknown as ReturnType<typeof Type.Object>)
|
|
: Type.Object({}, { additionalProperties: true });
|
|
pi.registerTool({
|
|
name: tool.name,
|
|
label: tool.name,
|
|
description: tool.description ?? `MemPalace tool: ${tool.name}`,
|
|
parameters: schema,
|
|
async execute(_toolCallId, params) {
|
|
if (!available) {
|
|
// Stall-kill/crash is not a permanent latch: try a bounded respawn.
|
|
available = await client.ensureAlive();
|
|
}
|
|
if (!available) {
|
|
return {
|
|
content: [
|
|
{
|
|
type: "text",
|
|
text: "mempalace-mcp not available (respawn budget exhausted) — restart pi to retry palace tools",
|
|
},
|
|
],
|
|
details: {},
|
|
isError: true,
|
|
};
|
|
}
|
|
try {
|
|
const result = await client.callTool(tool.name, (params ?? {}) as Record<string, unknown>);
|
|
// MCP tool results use { content: [...], isError?: boolean }
|
|
return {
|
|
content: result?.content ?? [{ type: "text", text: JSON.stringify(result) }],
|
|
details: { raw: result },
|
|
isError: result?.isError === true,
|
|
};
|
|
} catch (err) {
|
|
return {
|
|
content: [{ type: "text", text: `MCP call failed: ${(err as Error).message}` }],
|
|
details: {},
|
|
isError: true,
|
|
};
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
pi.on("session_shutdown", async () => {
|
|
client.stop();
|
|
});
|
|
|
|
pi.on("session_start", async (event, ctx) => {
|
|
// On resume/fork, the previous session's palace context is already in
|
|
// the thread — skip the wake-up injection.
|
|
if (event.reason === "resume" || event.reason === "fork") {
|
|
wokeUp = true;
|
|
}
|
|
ctx.ui.notify(
|
|
`mempalace bridge: ${client.tools.length} tools registered (agent=${agentName})`,
|
|
"info",
|
|
);
|
|
});
|
|
|
|
// --- Auto wake-up (mempalace skill Phase 1) ---
|
|
pi.on("before_agent_start", async (_event, _ctx) => {
|
|
if (wokeUp || !available) return;
|
|
wokeUp = true; // one-shot, even if the calls below fail
|
|
|
|
const sections: string[] = [];
|
|
try {
|
|
const status = await client.callTool("mempalace_status", {});
|
|
const text = extractText(status);
|
|
if (text) sections.push(`## mempalace_status\n\n${text}`);
|
|
} catch (err) {
|
|
sections.push(`## mempalace_status\n\n(error: ${(err as Error).message})`);
|
|
}
|
|
try {
|
|
const diary = await client.callTool("mempalace_diary_read", {
|
|
agent_name: agentName,
|
|
last_n: 5,
|
|
});
|
|
const text = extractText(diary);
|
|
if (text) sections.push(`## mempalace_diary_read (agent=${agentName}, last_n=5)\n\n${text}`);
|
|
} catch (err) {
|
|
sections.push(`## mempalace_diary_read\n\n(error: ${(err as Error).message})`);
|
|
}
|
|
|
|
if (sections.length === 0) return;
|
|
|
|
const body =
|
|
`MemPalace wake-up context (auto-injected by the mempalace extension). ` +
|
|
`This is your palace orientation for this session — do not announce it to the user, ` +
|
|
`just use it to inform your answers. Agent identity for diary tools: "${agentName}".\n\n` +
|
|
sections.join("\n\n---\n\n");
|
|
|
|
return {
|
|
message: {
|
|
customType: "mempalace-wakeup",
|
|
content: body,
|
|
display: true,
|
|
},
|
|
};
|
|
});
|
|
|
|
// --- Manual wind-down (mempalace skill Phase 3) ---
|
|
pi.registerCommand("mempalace-diary", {
|
|
description: "Ask the LLM to write an AAAK diary entry summarizing this session",
|
|
handler: async (args, ctx) => {
|
|
if (!available) {
|
|
ctx.ui.notify("mempalace bridge not available", "warning");
|
|
return;
|
|
}
|
|
const topic = args.trim() || "session-summary";
|
|
const prompt =
|
|
`Write a MemPalace diary entry for this session using the AAAK format ` +
|
|
`described in the mempalace skill. Call mempalace_diary_write with ` +
|
|
`agent_name="${agentName}", topic="${topic}", and a compressed AAAK entry ` +
|
|
`that summarizes what we worked on, what was discovered, and any open ` +
|
|
`threads. Then confirm the write succeeded. Do not ask me for ` +
|
|
`clarification — draft from the session so far.`;
|
|
pi.sendUserMessage(prompt);
|
|
},
|
|
});
|
|
}
|
|
|
|
/** Flatten MCP tool result content into plain text for context injection. */
|
|
function extractText(mcpResult: any): string {
|
|
const parts = mcpResult?.content;
|
|
if (!Array.isArray(parts)) return typeof mcpResult === "string" ? mcpResult : "";
|
|
return parts
|
|
.map((p: any) => (typeof p?.text === "string" ? p.text : ""))
|
|
.filter(Boolean)
|
|
.join("\n");
|
|
}
|