Files
mempalace-toolkit/bin/mempalace-pi-session
T
joakimp 6352373a1f fix(feeders): make post-mine repair opt-in, not default
The three feeder wrappers (mempalace-docs, mempalace-pi-session,
mempalace-session) unconditionally ran 'mempalace repair --yes' after
mining, controllable only via --no-repair opt-out. The contrib launchd
and systemd templates did not pass --no-repair, so every scheduled tick
invoked the destructive in-place HNSW rebuild.

This has bitten us twice:
  - 2026-05-04 09:08: a kickstart triggered repair while an MCP
    subprocess held the DB open; the live collection was wiped (0
    drawers) and had to be restored from the palace.backup snapshot.
  - 2026-05-05 10:00: post-mine repair crashed mid-rebuild with
    'NotFoundError: Collection [<uuid>] does not exist' - chromadb's
    rebuild recreated the collection under a new UUID while the code
    still held the old handle. Live DB survived only by luck (crash
    hit before the swap).

Fix: flip the default.
  - New flag: --repair (opt-in). Prints a warning and sleeps 3s before
    invoking 'mempalace repair --yes'.
  - --no-repair is retained as a deprecated no-op alias for backward
    compatibility with any scripts/units still passing it.
  - Default behavior: no repair. Routine ChromaDB add() keeps HNSW
    consistent; repair is a recovery op, not a maintenance tick.

Docs updated to match: README, SKILL, ARCHITECTURE, AGENTS,
contrib/README. Scheduling guidance now explicitly warns against
enabling --repair on cron/launchd/systemd-timer runs.
2026-05-05 12:35:04 +02:00

484 lines
19 KiB
Bash
Executable File

#!/usr/bin/env bash
# mempalace-pi-session — mine pi coding-agent session history into MemPalace
#
# Pi persists every session (verbatim user/assistant turns + tool calls + tool
# results) as newline-delimited JSONL under ~/.pi/agent/sessions/. Pi has no
# upstream MemPalace integration and mempalace-toolkit's existing wrapper
# (`mempalace-session`) only handles opencode's SQLite DB, so pi sessions are
# currently invisible to the palace.
#
# Strategy (mirrors mempalace-session):
# 1. Walk ~/.pi/agent/sessions/**/*.jsonl and export each qualifying session
# to a Claude Code JSONL file (format the mempalace normalizer speaks).
# 2. Stage exports under ~/.cache/mempalace-pi-session/<wing>/.
# 3. Run `mempalace mine --mode convos` against the staging dir.
#
# Labelling: every exported transcript begins with a synthetic header
# [session: <title> | <cwd> | <YYYY-MM-DD> | source: pi]
# so post-mine search results are self-identifying (pi vs opencode vs other).
#
# Dedup: mempalace convos mode keys on source_file (absolute staging path).
# Staging paths are deterministic per pi session UUID, so re-runs are
# idempotent until session content actually changes.
#
# Session filter: sessions with fewer than --min-messages *user+assistant*
# messages (default 3) are skipped to avoid filing single-prompt throwaways.
#
# Usage:
# mempalace-pi-session
# mempalace-pi-session --wing <name>
# mempalace-pi-session --session <uuid-prefix>
# mempalace-pi-session --since 2026-04-01
# mempalace-pi-session --min-messages 6
# mempalace-pi-session --dry-run
# mempalace-pi-session --help
#
# Exit codes:
# 0 success
# 1 usage / argument error
# 2 pi sessions dir missing
# 3 mempalace CLI not installed
# 4 mine failed
#
# Dependencies: bash, python3 (stdlib only), mempalace (v3.3.3+)
set -euo pipefail
# ── Defaults ─────────────────────────────────────────────────────────
AGENT="${USER:-mempalace}"
WING="wing_conversations"
SESSION_ID=""
SINCE=""
MIN_MESSAGES=3
DRY_RUN=0
DO_REPAIR=0
PI_SESSIONS_DIR="${PI_SESSIONS_DIR:-$HOME/.pi/agent/sessions}"
# ── Usage ────────────────────────────────────────────────────────────
usage() {
cat <<'EOF'
mempalace-pi-session — mine pi coding-agent session history into MemPalace
Usage:
mempalace-pi-session [options]
Options:
--wing <name> Target wing (default: wing_conversations)
--session <prefix> Export one session only (match on UUID prefix)
--since <YYYY-MM-DD> Only sessions last modified on/after this date
--min-messages <N> Skip sessions with fewer than N user+assistant
turns (default: 3)
--agent <name> Agent name recorded on drawers (default: $USER)
--sessions-dir <path> Path to pi sessions dir (default: $PI_SESSIONS_DIR
or ~/.pi/agent/sessions)
--dry-run Export + list; do not mine into palace. Each session
is tagged [NEW] or [SKIP] based on whether its
source_file is already in the palace.
--repair Run `mempalace repair` after mining (opt-in).
WARNING: repair does a destructive in-place HNSW
rebuild. If it races a live MCP connection or
crashes mid-rebuild, it can wipe the collection.
Only pass this from a quiet, interactive context.
Not safe for unattended cron/launchd schedules.
--no-repair (Deprecated; no-repair is now the default.)
-h, --help Show this help
Idempotency:
Re-running on the same corpus is safe. The export step writes every
qualifying session to the cache; the mine step dedups by source_file so
already-filed sessions are skipped without re-embedding.
Transcript shape per session:
- Synthetic header as first user turn:
[session: <title> | <cwd> | <YYYY-MM-DD> | source: pi]
- User/assistant messages extracted from pi JSONL `message` entries
- Assistant toolCall blocks → Claude Code `tool_use` blocks
- `toolResult` role messages → `tool_result` blocks (folded back into
the assistant turn by the normalizer)
- `bashExecution`, `custom(display=true)`, `branchSummary`,
`compactionSummary` → rendered as text annotations
- `thinking` content blocks → dropped (noise)
- Image content blocks → dropped (palace embeds text only)
Dedup:
- source_file = absolute staging path (deterministic per pi session UUID)
- Re-runs skip unchanged sessions. To force re-mining, delete the staging
dir: rm -rf ~/.cache/mempalace-pi-session/<wing>/
Rationale:
Pi's extension ecosystem could, in principle, stream sessions into the
palace as they happen, but that requires a resident MCP connection during
every pi session. This wrapper is the batch, harness-agnostic alternative:
it reads the durable on-disk JSONL and mines it on a schedule.
EOF
}
# ── Parse args ───────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case "$1" in
-h|--help) usage; exit 0 ;;
--wing) WING="${2:-}"; shift 2 ;;
--session) SESSION_ID="${2:-}"; shift 2 ;;
--since) SINCE="${2:-}"; shift 2 ;;
--min-messages) MIN_MESSAGES="${2:-}"; shift 2 ;;
--agent) AGENT="${2:-}"; shift 2 ;;
--sessions-dir) PI_SESSIONS_DIR="${2:-}"; shift 2 ;;
--dry-run) DRY_RUN=1; shift ;;
--repair) DO_REPAIR=1; shift ;;
--no-repair) shift ;; # deprecated alias; no-repair is the default
--) shift; break ;;
-*) echo "error: unknown option: $1" >&2; usage >&2; exit 1 ;;
*) echo "error: unexpected arg: $1" >&2; exit 1 ;;
esac
done
# ── Preflight ────────────────────────────────────────────────────────
if [[ ! -d "$PI_SESSIONS_DIR" ]]; then
echo "error: pi sessions dir not found at $PI_SESSIONS_DIR" >&2
echo " override with --sessions-dir <path> or PI_SESSIONS_DIR env var" >&2
exit 2
fi
if ! command -v mempalace >/dev/null 2>&1; then
echo "error: mempalace CLI not found in PATH" >&2
exit 3
fi
if ! [[ "$MIN_MESSAGES" =~ ^[0-9]+$ ]]; then
echo "error: --min-messages must be an integer" >&2
exit 1
fi
# ── Staging dir ──────────────────────────────────────────────────────
CACHE_ROOT="${XDG_CACHE_HOME:-$HOME/.cache}/mempalace-pi-session"
STAGE="$CACHE_ROOT/$WING"
mkdir -p "$STAGE"
# ── Export sessions (Python heredoc) ────────────────────────────────
# Parses pi JSONL files and writes Claude Code JSONL per session into $STAGE.
# Also classifies each export as NEW/ALREADY FILED (by source_file lookup)
# so --dry-run reports the real mine-set size. Classification is advisory;
# `mempalace mine --mode convos` is still the authoritative dedup.
export_count=$(python3 - "$PI_SESSIONS_DIR" "$STAGE" "$SESSION_ID" "$SINCE" "$MIN_MESSAGES" <<'PY'
import json, os, sqlite3, sys
from datetime import datetime, timezone
from pathlib import Path
sessions_dir, stage, session_filter, since, min_messages = sys.argv[1:6]
min_messages = int(min_messages)
stage = Path(stage)
sessions_dir = Path(sessions_dir)
# Convert --since YYYY-MM-DD to epoch seconds (comparing against file mtime)
since_epoch = None
if since:
try:
since_epoch = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
except ValueError:
print(f"error: --since must be YYYY-MM-DD, got {since!r}", file=sys.stderr)
sys.exit(1)
# ── Load palace's already-filed source_files (best-effort, read-only) ──
already_filed = set()
palace_path = os.environ.get("MEMPALACE_PATH", os.path.expanduser("~/.mempalace/palace"))
chroma_db = Path(palace_path) / "chroma.sqlite3"
if chroma_db.is_file():
try:
pcon = sqlite3.connect(f"file:{chroma_db}?mode=ro", uri=True)
for (sf,) in pcon.execute(
"SELECT DISTINCT string_value FROM embedding_metadata "
"WHERE key='source_file' AND string_value LIKE ?",
(f"{stage}%",),
):
if sf:
already_filed.add(sf)
pcon.close()
except sqlite3.Error:
pass # palace unreachable → miner will dedup
def extract_text(content):
"""Flatten a message content (string | list-of-blocks) to plain text.
Drops image + thinking blocks; keeps text + renders toolCall/toolResult
stubs inline. Returns ("", [tool_uses], [tool_results]) where tool_uses
are collected for assistant messages and tool_results for toolResult
messages.
"""
if isinstance(content, str):
return content, [], []
if not isinstance(content, list):
return "", [], []
text_parts = []
tool_uses = []
for block in content:
if not isinstance(block, dict):
continue
bt = block.get("type")
if bt == "text":
t = block.get("text", "")
if t:
text_parts.append(t)
elif bt == "thinking":
# Drop reasoning content — high-noise, low-signal for search.
continue
elif bt == "image":
# Palace is text-only.
continue
elif bt == "toolCall":
tool_uses.append({
"type": "tool_use",
"id": block.get("id") or "",
"name": block.get("name") or "tool",
"input": block.get("arguments") or {},
})
return "\n".join(text_parts), tool_uses, []
def load_session(path: Path):
"""Parse a pi JSONL session file. Returns (header, entries) or None."""
try:
with path.open("r", encoding="utf-8") as f:
lines = [ln for ln in f.read().splitlines() if ln.strip()]
except OSError:
return None
if not lines:
return None
try:
header = json.loads(lines[0])
except json.JSONDecodeError:
return None
if header.get("type") != "session":
return None
entries = []
for ln in lines[1:]:
try:
entries.append(json.loads(ln))
except json.JSONDecodeError:
continue
return header, entries
def derive_title(entries, fallback: str) -> str:
"""Prefer session_info.name; else truncated first user message."""
# session_info entries: most-recent wins
name = None
for e in entries:
if e.get("type") == "session_info" and e.get("name"):
name = e["name"]
if name:
return name[:120]
for e in entries:
if e.get("type") != "message":
continue
msg = e.get("message") or {}
if msg.get("role") != "user":
continue
text, _, _ = extract_text(msg.get("content"))
text = " ".join(text.split()) # collapse whitespace
if text:
return (text[:80] + "…") if len(text) > 80 else text
return fallback
# Discover session files
paths = sorted(sessions_dir.rglob("*.jsonl"))
if session_filter:
paths = [p for p in paths if session_filter in p.name]
exported = 0
skipped_short = 0
skipped_malformed = 0
skipped_already_filed = 0
for path in paths:
try:
mtime = path.stat().st_mtime
except OSError:
continue
if since_epoch is not None and mtime < since_epoch:
continue
parsed = load_session(path)
if parsed is None:
skipped_malformed += 1
continue
header, entries = parsed
session_uuid = header.get("id") or path.stem
cwd = header.get("cwd") or "?"
header_ts = header.get("timestamp") or ""
try:
date_str = header_ts[:10] if header_ts else datetime.fromtimestamp(
mtime, tz=timezone.utc).strftime("%Y-%m-%d")
except Exception:
date_str = datetime.fromtimestamp(mtime, tz=timezone.utc).strftime("%Y-%m-%d")
# Count user+assistant message entries for the min-messages filter
turn_count = sum(
1 for e in entries
if e.get("type") == "message"
and (e.get("message") or {}).get("role") in ("user", "assistant")
)
if turn_count < min_messages:
skipped_short += 1
continue
title = derive_title(entries, fallback=session_uuid[:8])
out_lines = []
out_lines.append({
"type": "user",
"message": {
"content": f"[session: {title} | {cwd} | {date_str} | source: pi]"
},
})
for e in entries:
t = e.get("type")
if t == "message":
msg = e.get("message") or {}
role = msg.get("role")
if role == "user":
text, _, _ = extract_text(msg.get("content"))
if text.strip():
out_lines.append({"type": "user", "message": {"content": text}})
elif role == "assistant":
text, tool_uses, _ = extract_text(msg.get("content"))
blocks = []
if text.strip():
blocks.append({"type": "text", "text": text})
blocks.extend(tool_uses)
if not blocks:
continue
# Simplify single-text to string (matches mempalace-session).
if len(blocks) == 1 and blocks[0].get("type") == "text":
content = blocks[0]["text"]
else:
content = blocks
out_lines.append({"type": "assistant", "message": {"content": content}})
elif role == "toolResult":
text, _, _ = extract_text(msg.get("content"))
tool_id = msg.get("toolCallId") or ""
if not tool_id:
continue
out_lines.append({
"type": "human",
"message": {
"content": [{
"type": "tool_result",
"tool_use_id": tool_id,
"content": text or "(no output)",
}],
},
})
elif role == "bashExecution":
# Rendered as a synthetic assistant annotation so the
# command + output stay associated with the surrounding turn.
cmd = msg.get("command") or ""
out = msg.get("output") or ""
exit_code = msg.get("exitCode")
note = f"[user-bash] $ {cmd}\nexit={exit_code}\n{out}".strip()
if note:
out_lines.append({"type": "user", "message": {"content": note}})
elif role == "custom":
if not msg.get("display"):
continue
text, _, _ = extract_text(msg.get("content"))
if text.strip():
ctype = msg.get("customType") or "custom"
out_lines.append({
"type": "user",
"message": {"content": f"[custom:{ctype}] {text}"},
})
elif role in ("branchSummary", "compactionSummary"):
summary = msg.get("summary") or ""
if summary.strip():
out_lines.append({
"type": "user",
"message": {"content": f"[{role}] {summary}"},
})
# thinking-only / empty messages silently dropped
elif t in (
"model_change", "thinking_level_change", "compaction",
"branch_summary", "label", "session_info", "custom",
"custom_message",
):
# Non-conversational entries: drop. (custom_message with
# display=true could be included but we already get it via the
# "custom" message role above when pi materializes one.)
continue
# Need at least 2 turns (header + one real turn) for the normalizer.
if len(out_lines) < 2:
skipped_short += 1
continue
out_path = stage / f"pi_{session_uuid}.jsonl"
with out_path.open("w", encoding="utf-8") as f:
for obj in out_lines:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
# Preserve session mtime on the staging file for dedup stability.
try:
os.utime(out_path, (mtime, mtime))
except OSError:
pass
exported += 1
is_filed = str(out_path) in already_filed
if is_filed:
skipped_already_filed += 1
status = "SKIP" if is_filed else "NEW "
print(f" [{status}] {out_path.name} ({turn_count} turns)", file=sys.stderr)
print(f"EXPORTED {exported}")
print(f"ALREADY_FILED {skipped_already_filed}")
if skipped_short:
print(f"SKIPPED_SHORT {skipped_short}", file=sys.stderr)
if skipped_malformed:
print(f"SKIPPED_MALFORMED {skipped_malformed}", file=sys.stderr)
PY
)
# Parse counts from stdout
count="$(printf '%s\n' "$export_count" | awk '/^EXPORTED / { print $2 }')"
count="${count:-0}"
already_filed="$(printf '%s\n' "$export_count" | awk '/^ALREADY_FILED / { print $2 }')"
already_filed="${already_filed:-0}"
to_file=$(( count - already_filed ))
if [[ "$count" -eq 0 ]]; then
echo "no sessions qualified for export"
exit 0
fi
echo ""
echo "Exported $count session(s) to $STAGE"
echo " $to_file new → will be filed on mine"
echo " $already_filed already filed → will be skipped (dedup by source_file)"
if [[ $DRY_RUN -eq 1 ]]; then
if [[ "$to_file" -eq 0 ]]; then
echo ""
echo "--dry-run: no new sessions to mine. A real run would skip all $count."
else
echo ""
echo "--dry-run: skipping mine step. A real run would file $to_file new session(s)."
fi
exit 0
fi
# ── Run the mine ─────────────────────────────────────────────────────
echo ""
echo "Mining into wing '$WING'..."
if ! mempalace mine "$STAGE" --mode convos --wing "$WING" --agent "$AGENT"; then
echo "error: mempalace mine failed" >&2
exit 4
fi
# ── Repair index ─────────────────────────────────────────────────────
if [[ $DO_REPAIR -eq 1 ]]; then
echo ""
echo "WARNING: --repair runs an in-place HNSW rebuild that has wiped"
echo " live palaces on past runs. Proceeding in 3 seconds..."
sleep 3
echo "Rebuilding HNSW index..."
mempalace repair --yes
fi
echo ""
echo "Done. Wing '$WING' updated. Remember to reconnect any live MCP sessions."