Files
mempalace-toolkit/bin/mempalace-pi-session
T
joakimp 9450a45194 feat(pi-session): add mempalace-pi-session feeder for pi coding-agent sessions
Parallel to mempalace-session, this wrapper walks ~/.pi/agent/sessions/
JSONL files and mines qualifying sessions into wing_conversations via
'mempalace mine --mode convos'.

Design choices mirror mempalace-session:
- Export-stage-mine idiom with deterministic per-session staging paths
  under ~/.cache/mempalace-pi-session/<wing>/, so 'mempalace mine' dedup
  on source_file makes re-runs idempotent.
- --dry-run classifies each export as [NEW] or [SKIP] by matching staging
  path against the palace's already-filed source_files.
- --min-messages filter skips throwaway single-prompt sessions.

Pi-specific parsing:
- Pi JSONL is a typed tree (id/parentId) per docs/session-format.md;
  this walks in file order, which is correct for the overwhelmingly
  linear case and harmlessly duplicative on branched sessions (palace
  semantic dedup handles it).
- Roles mapped to Claude Code JSONL shape:
    user      -> {type:user, content:text}
    assistant -> {type:assistant, content:[text, tool_use]}
    toolResult-> {type:human, content:[tool_result]} (folded back by normalizer)
    bashExecution/custom(display)/branchSummary/compactionSummary
              -> rendered as text annotations
- thinking blocks and image blocks dropped (noise / palace is text-only).

Source labelling:
- Staging filenames prefixed 'pi_<uuid>.jsonl' so every drawer's
  source_file metadata (visible in search results) unambiguously
  identifies the harness. Opencode's convention ('<slug>_<id>.jsonl')
  is preserved to keep the existing 19k+ drawers deduped.
- Inline synthetic header on first chunk:
    [session: <title> | <cwd> | <date> | source: pi]
  as a secondary signal.
2026-05-05 08:48:20 +02:00

474 lines
18 KiB
Bash
Executable File

#!/usr/bin/env bash
# mempalace-pi-session — mine pi coding-agent session history into MemPalace
#
# Pi persists every session (verbatim user/assistant turns + tool calls + tool
# results) as newline-delimited JSONL under ~/.pi/agent/sessions/. Pi has no
# upstream MemPalace integration and mempalace-toolkit's existing wrapper
# (`mempalace-session`) only handles opencode's SQLite DB, so pi sessions are
# currently invisible to the palace.
#
# Strategy (mirrors mempalace-session):
# 1. Walk ~/.pi/agent/sessions/**/*.jsonl and export each qualifying session
# to a Claude Code JSONL file (format the mempalace normalizer speaks).
# 2. Stage exports under ~/.cache/mempalace-pi-session/<wing>/.
# 3. Run `mempalace mine --mode convos` against the staging dir.
#
# Labelling: every exported transcript begins with a synthetic header
# [session: <title> | <cwd> | <YYYY-MM-DD> | source: pi]
# so post-mine search results are self-identifying (pi vs opencode vs other).
#
# Dedup: mempalace convos mode keys on source_file (absolute staging path).
# Staging paths are deterministic per pi session UUID, so re-runs are
# idempotent until session content actually changes.
#
# Session filter: sessions with fewer than --min-messages *user+assistant*
# messages (default 3) are skipped to avoid filing single-prompt throwaways.
#
# Usage:
# mempalace-pi-session
# mempalace-pi-session --wing <name>
# mempalace-pi-session --session <uuid-prefix>
# mempalace-pi-session --since 2026-04-01
# mempalace-pi-session --min-messages 6
# mempalace-pi-session --dry-run
# mempalace-pi-session --help
#
# Exit codes:
# 0 success
# 1 usage / argument error
# 2 pi sessions dir missing
# 3 mempalace CLI not installed
# 4 mine failed
#
# Dependencies: bash, python3 (stdlib only), mempalace (v3.3.3+)
set -euo pipefail
# ── Defaults ─────────────────────────────────────────────────────────
AGENT="${USER:-mempalace}"
WING="wing_conversations"
SESSION_ID=""
SINCE=""
MIN_MESSAGES=3
DRY_RUN=0
NO_REPAIR=0
PI_SESSIONS_DIR="${PI_SESSIONS_DIR:-$HOME/.pi/agent/sessions}"
# ── Usage ────────────────────────────────────────────────────────────
usage() {
cat <<'EOF'
mempalace-pi-session — mine pi coding-agent session history into MemPalace
Usage:
mempalace-pi-session [options]
Options:
--wing <name> Target wing (default: wing_conversations)
--session <prefix> Export one session only (match on UUID prefix)
--since <YYYY-MM-DD> Only sessions last modified on/after this date
--min-messages <N> Skip sessions with fewer than N user+assistant
turns (default: 3)
--agent <name> Agent name recorded on drawers (default: $USER)
--sessions-dir <path> Path to pi sessions dir (default: $PI_SESSIONS_DIR
or ~/.pi/agent/sessions)
--dry-run Export + list; do not mine into palace. Each session
is tagged [NEW] or [SKIP] based on whether its
source_file is already in the palace.
--no-repair Skip `mempalace repair` after mining
-h, --help Show this help
Idempotency:
Re-running on the same corpus is safe. The export step writes every
qualifying session to the cache; the mine step dedups by source_file so
already-filed sessions are skipped without re-embedding.
Transcript shape per session:
- Synthetic header as first user turn:
[session: <title> | <cwd> | <YYYY-MM-DD> | source: pi]
- User/assistant messages extracted from pi JSONL `message` entries
- Assistant toolCall blocks → Claude Code `tool_use` blocks
- `toolResult` role messages → `tool_result` blocks (folded back into
the assistant turn by the normalizer)
- `bashExecution`, `custom(display=true)`, `branchSummary`,
`compactionSummary` → rendered as text annotations
- `thinking` content blocks → dropped (noise)
- Image content blocks → dropped (palace embeds text only)
Dedup:
- source_file = absolute staging path (deterministic per pi session UUID)
- Re-runs skip unchanged sessions. To force re-mining, delete the staging
dir: rm -rf ~/.cache/mempalace-pi-session/<wing>/
Rationale:
Pi's extension ecosystem could, in principle, stream sessions into the
palace as they happen, but that requires a resident MCP connection during
every pi session. This wrapper is the batch, harness-agnostic alternative:
it reads the durable on-disk JSONL and mines it on a schedule.
EOF
}
# ── Parse args ───────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case "$1" in
-h|--help) usage; exit 0 ;;
--wing) WING="${2:-}"; shift 2 ;;
--session) SESSION_ID="${2:-}"; shift 2 ;;
--since) SINCE="${2:-}"; shift 2 ;;
--min-messages) MIN_MESSAGES="${2:-}"; shift 2 ;;
--agent) AGENT="${2:-}"; shift 2 ;;
--sessions-dir) PI_SESSIONS_DIR="${2:-}"; shift 2 ;;
--dry-run) DRY_RUN=1; shift ;;
--no-repair) NO_REPAIR=1; shift ;;
--) shift; break ;;
-*) echo "error: unknown option: $1" >&2; usage >&2; exit 1 ;;
*) echo "error: unexpected arg: $1" >&2; exit 1 ;;
esac
done
# ── Preflight ────────────────────────────────────────────────────────
if [[ ! -d "$PI_SESSIONS_DIR" ]]; then
echo "error: pi sessions dir not found at $PI_SESSIONS_DIR" >&2
echo " override with --sessions-dir <path> or PI_SESSIONS_DIR env var" >&2
exit 2
fi
if ! command -v mempalace >/dev/null 2>&1; then
echo "error: mempalace CLI not found in PATH" >&2
exit 3
fi
if ! [[ "$MIN_MESSAGES" =~ ^[0-9]+$ ]]; then
echo "error: --min-messages must be an integer" >&2
exit 1
fi
# ── Staging dir ──────────────────────────────────────────────────────
CACHE_ROOT="${XDG_CACHE_HOME:-$HOME/.cache}/mempalace-pi-session"
STAGE="$CACHE_ROOT/$WING"
mkdir -p "$STAGE"
# ── Export sessions (Python heredoc) ────────────────────────────────
# Parses pi JSONL files and writes Claude Code JSONL per session into $STAGE.
# Also classifies each export as NEW/ALREADY FILED (by source_file lookup)
# so --dry-run reports the real mine-set size. Classification is advisory;
# `mempalace mine --mode convos` is still the authoritative dedup.
export_count=$(python3 - "$PI_SESSIONS_DIR" "$STAGE" "$SESSION_ID" "$SINCE" "$MIN_MESSAGES" <<'PY'
import json, os, sqlite3, sys
from datetime import datetime, timezone
from pathlib import Path
sessions_dir, stage, session_filter, since, min_messages = sys.argv[1:6]
min_messages = int(min_messages)
stage = Path(stage)
sessions_dir = Path(sessions_dir)
# Convert --since YYYY-MM-DD to epoch seconds (comparing against file mtime)
since_epoch = None
if since:
try:
since_epoch = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
except ValueError:
print(f"error: --since must be YYYY-MM-DD, got {since!r}", file=sys.stderr)
sys.exit(1)
# ── Load palace's already-filed source_files (best-effort, read-only) ──
already_filed = set()
palace_path = os.environ.get("MEMPALACE_PATH", os.path.expanduser("~/.mempalace/palace"))
chroma_db = Path(palace_path) / "chroma.sqlite3"
if chroma_db.is_file():
try:
pcon = sqlite3.connect(f"file:{chroma_db}?mode=ro", uri=True)
for (sf,) in pcon.execute(
"SELECT DISTINCT string_value FROM embedding_metadata "
"WHERE key='source_file' AND string_value LIKE ?",
(f"{stage}%",),
):
if sf:
already_filed.add(sf)
pcon.close()
except sqlite3.Error:
pass # palace unreachable → miner will dedup
def extract_text(content):
"""Flatten a message content (string | list-of-blocks) to plain text.
Drops image + thinking blocks; keeps text + renders toolCall/toolResult
stubs inline. Returns ("", [tool_uses], [tool_results]) where tool_uses
are collected for assistant messages and tool_results for toolResult
messages.
"""
if isinstance(content, str):
return content, [], []
if not isinstance(content, list):
return "", [], []
text_parts = []
tool_uses = []
for block in content:
if not isinstance(block, dict):
continue
bt = block.get("type")
if bt == "text":
t = block.get("text", "")
if t:
text_parts.append(t)
elif bt == "thinking":
# Drop reasoning content — high-noise, low-signal for search.
continue
elif bt == "image":
# Palace is text-only.
continue
elif bt == "toolCall":
tool_uses.append({
"type": "tool_use",
"id": block.get("id") or "",
"name": block.get("name") or "tool",
"input": block.get("arguments") or {},
})
return "\n".join(text_parts), tool_uses, []
def load_session(path: Path):
"""Parse a pi JSONL session file. Returns (header, entries) or None."""
try:
with path.open("r", encoding="utf-8") as f:
lines = [ln for ln in f.read().splitlines() if ln.strip()]
except OSError:
return None
if not lines:
return None
try:
header = json.loads(lines[0])
except json.JSONDecodeError:
return None
if header.get("type") != "session":
return None
entries = []
for ln in lines[1:]:
try:
entries.append(json.loads(ln))
except json.JSONDecodeError:
continue
return header, entries
def derive_title(entries, fallback: str) -> str:
"""Prefer session_info.name; else truncated first user message."""
# session_info entries: most-recent wins
name = None
for e in entries:
if e.get("type") == "session_info" and e.get("name"):
name = e["name"]
if name:
return name[:120]
for e in entries:
if e.get("type") != "message":
continue
msg = e.get("message") or {}
if msg.get("role") != "user":
continue
text, _, _ = extract_text(msg.get("content"))
text = " ".join(text.split()) # collapse whitespace
if text:
return (text[:80] + "…") if len(text) > 80 else text
return fallback
# Discover session files
paths = sorted(sessions_dir.rglob("*.jsonl"))
if session_filter:
paths = [p for p in paths if session_filter in p.name]
exported = 0
skipped_short = 0
skipped_malformed = 0
skipped_already_filed = 0
for path in paths:
try:
mtime = path.stat().st_mtime
except OSError:
continue
if since_epoch is not None and mtime < since_epoch:
continue
parsed = load_session(path)
if parsed is None:
skipped_malformed += 1
continue
header, entries = parsed
session_uuid = header.get("id") or path.stem
cwd = header.get("cwd") or "?"
header_ts = header.get("timestamp") or ""
try:
date_str = header_ts[:10] if header_ts else datetime.fromtimestamp(
mtime, tz=timezone.utc).strftime("%Y-%m-%d")
except Exception:
date_str = datetime.fromtimestamp(mtime, tz=timezone.utc).strftime("%Y-%m-%d")
# Count user+assistant message entries for the min-messages filter
turn_count = sum(
1 for e in entries
if e.get("type") == "message"
and (e.get("message") or {}).get("role") in ("user", "assistant")
)
if turn_count < min_messages:
skipped_short += 1
continue
title = derive_title(entries, fallback=session_uuid[:8])
out_lines = []
out_lines.append({
"type": "user",
"message": {
"content": f"[session: {title} | {cwd} | {date_str} | source: pi]"
},
})
for e in entries:
t = e.get("type")
if t == "message":
msg = e.get("message") or {}
role = msg.get("role")
if role == "user":
text, _, _ = extract_text(msg.get("content"))
if text.strip():
out_lines.append({"type": "user", "message": {"content": text}})
elif role == "assistant":
text, tool_uses, _ = extract_text(msg.get("content"))
blocks = []
if text.strip():
blocks.append({"type": "text", "text": text})
blocks.extend(tool_uses)
if not blocks:
continue
# Simplify single-text to string (matches mempalace-session).
if len(blocks) == 1 and blocks[0].get("type") == "text":
content = blocks[0]["text"]
else:
content = blocks
out_lines.append({"type": "assistant", "message": {"content": content}})
elif role == "toolResult":
text, _, _ = extract_text(msg.get("content"))
tool_id = msg.get("toolCallId") or ""
if not tool_id:
continue
out_lines.append({
"type": "human",
"message": {
"content": [{
"type": "tool_result",
"tool_use_id": tool_id,
"content": text or "(no output)",
}],
},
})
elif role == "bashExecution":
# Rendered as a synthetic assistant annotation so the
# command + output stay associated with the surrounding turn.
cmd = msg.get("command") or ""
out = msg.get("output") or ""
exit_code = msg.get("exitCode")
note = f"[user-bash] $ {cmd}\nexit={exit_code}\n{out}".strip()
if note:
out_lines.append({"type": "user", "message": {"content": note}})
elif role == "custom":
if not msg.get("display"):
continue
text, _, _ = extract_text(msg.get("content"))
if text.strip():
ctype = msg.get("customType") or "custom"
out_lines.append({
"type": "user",
"message": {"content": f"[custom:{ctype}] {text}"},
})
elif role in ("branchSummary", "compactionSummary"):
summary = msg.get("summary") or ""
if summary.strip():
out_lines.append({
"type": "user",
"message": {"content": f"[{role}] {summary}"},
})
# thinking-only / empty messages silently dropped
elif t in (
"model_change", "thinking_level_change", "compaction",
"branch_summary", "label", "session_info", "custom",
"custom_message",
):
# Non-conversational entries: drop. (custom_message with
# display=true could be included but we already get it via the
# "custom" message role above when pi materializes one.)
continue
# Need at least 2 turns (header + one real turn) for the normalizer.
if len(out_lines) < 2:
skipped_short += 1
continue
out_path = stage / f"pi_{session_uuid}.jsonl"
with out_path.open("w", encoding="utf-8") as f:
for obj in out_lines:
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
# Preserve session mtime on the staging file for dedup stability.
try:
os.utime(out_path, (mtime, mtime))
except OSError:
pass
exported += 1
is_filed = str(out_path) in already_filed
if is_filed:
skipped_already_filed += 1
status = "SKIP" if is_filed else "NEW "
print(f" [{status}] {out_path.name} ({turn_count} turns)", file=sys.stderr)
print(f"EXPORTED {exported}")
print(f"ALREADY_FILED {skipped_already_filed}")
if skipped_short:
print(f"SKIPPED_SHORT {skipped_short}", file=sys.stderr)
if skipped_malformed:
print(f"SKIPPED_MALFORMED {skipped_malformed}", file=sys.stderr)
PY
)
# Parse counts from stdout
count="$(printf '%s\n' "$export_count" | awk '/^EXPORTED / { print $2 }')"
count="${count:-0}"
already_filed="$(printf '%s\n' "$export_count" | awk '/^ALREADY_FILED / { print $2 }')"
already_filed="${already_filed:-0}"
to_file=$(( count - already_filed ))
if [[ "$count" -eq 0 ]]; then
echo "no sessions qualified for export"
exit 0
fi
echo ""
echo "Exported $count session(s) to $STAGE"
echo " $to_file new → will be filed on mine"
echo " $already_filed already filed → will be skipped (dedup by source_file)"
if [[ $DRY_RUN -eq 1 ]]; then
if [[ "$to_file" -eq 0 ]]; then
echo ""
echo "--dry-run: no new sessions to mine. A real run would skip all $count."
else
echo ""
echo "--dry-run: skipping mine step. A real run would file $to_file new session(s)."
fi
exit 0
fi
# ── Run the mine ─────────────────────────────────────────────────────
echo ""
echo "Mining into wing '$WING'..."
if ! mempalace mine "$STAGE" --mode convos --wing "$WING" --agent "$AGENT"; then
echo "error: mempalace mine failed" >&2
exit 4
fi
# ── Repair index ─────────────────────────────────────────────────────
if [[ $NO_REPAIR -eq 0 ]]; then
echo ""
echo "Rebuilding HNSW index..."
mempalace repair --yes
fi
echo ""
echo "Done. Wing '$WING' updated. Remember to reconnect any live MCP sessions."