feat(pi-session): add mempalace-pi-session feeder for pi coding-agent sessions
Parallel to mempalace-session, this wrapper walks ~/.pi/agent/sessions/
JSONL files and mines qualifying sessions into wing_conversations via
'mempalace mine --mode convos'.
Design choices mirror mempalace-session:
- Export-stage-mine idiom with deterministic per-session staging paths
under ~/.cache/mempalace-pi-session/<wing>/, so 'mempalace mine' dedup
on source_file makes re-runs idempotent.
- --dry-run classifies each export as [NEW] or [SKIP] by matching staging
path against the palace's already-filed source_files.
- --min-messages filter skips throwaway single-prompt sessions.
Pi-specific parsing:
- Pi JSONL is a typed tree (id/parentId) per docs/session-format.md;
this walks in file order, which is correct for the overwhelmingly
linear case and harmlessly duplicative on branched sessions (palace
semantic dedup handles it).
- Roles mapped to Claude Code JSONL shape:
user -> {type:user, content:text}
assistant -> {type:assistant, content:[text, tool_use]}
toolResult-> {type:human, content:[tool_result]} (folded back by normalizer)
bashExecution/custom(display)/branchSummary/compactionSummary
-> rendered as text annotations
- thinking blocks and image blocks dropped (noise / palace is text-only).
Source labelling:
- Staging filenames prefixed 'pi_<uuid>.jsonl' so every drawer's
source_file metadata (visible in search results) unambiguously
identifies the harness. Opencode's convention ('<slug>_<id>.jsonl')
is preserved to keep the existing 19k+ drawers deduped.
- Inline synthetic header on first chunk:
[session: <title> | <cwd> | <date> | source: pi]
as a secondary signal.
This commit is contained in:
Executable
+473
@@ -0,0 +1,473 @@
|
||||
#!/usr/bin/env bash
|
||||
# mempalace-pi-session — mine pi coding-agent session history into MemPalace
|
||||
#
|
||||
# Pi persists every session (verbatim user/assistant turns + tool calls + tool
|
||||
# results) as newline-delimited JSONL under ~/.pi/agent/sessions/. Pi has no
|
||||
# upstream MemPalace integration and mempalace-toolkit's existing wrapper
|
||||
# (`mempalace-session`) only handles opencode's SQLite DB, so pi sessions are
|
||||
# currently invisible to the palace.
|
||||
#
|
||||
# Strategy (mirrors mempalace-session):
|
||||
# 1. Walk ~/.pi/agent/sessions/**/*.jsonl and export each qualifying session
|
||||
# to a Claude Code JSONL file (format the mempalace normalizer speaks).
|
||||
# 2. Stage exports under ~/.cache/mempalace-pi-session/<wing>/.
|
||||
# 3. Run `mempalace mine --mode convos` against the staging dir.
|
||||
#
|
||||
# Labelling: every exported transcript begins with a synthetic header
|
||||
# [session: <title> | <cwd> | <YYYY-MM-DD> | source: pi]
|
||||
# so post-mine search results are self-identifying (pi vs opencode vs other).
|
||||
#
|
||||
# Dedup: mempalace convos mode keys on source_file (absolute staging path).
|
||||
# Staging paths are deterministic per pi session UUID, so re-runs are
|
||||
# idempotent until session content actually changes.
|
||||
#
|
||||
# Session filter: sessions with fewer than --min-messages *user+assistant*
|
||||
# messages (default 3) are skipped to avoid filing single-prompt throwaways.
|
||||
#
|
||||
# Usage:
|
||||
# mempalace-pi-session
|
||||
# mempalace-pi-session --wing <name>
|
||||
# mempalace-pi-session --session <uuid-prefix>
|
||||
# mempalace-pi-session --since 2026-04-01
|
||||
# mempalace-pi-session --min-messages 6
|
||||
# mempalace-pi-session --dry-run
|
||||
# mempalace-pi-session --help
|
||||
#
|
||||
# Exit codes:
|
||||
# 0 success
|
||||
# 1 usage / argument error
|
||||
# 2 pi sessions dir missing
|
||||
# 3 mempalace CLI not installed
|
||||
# 4 mine failed
|
||||
#
|
||||
# Dependencies: bash, python3 (stdlib only), mempalace (v3.3.3+)
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
# ── Defaults ─────────────────────────────────────────────────────────
|
||||
AGENT="${USER:-mempalace}"
|
||||
WING="wing_conversations"
|
||||
SESSION_ID=""
|
||||
SINCE=""
|
||||
MIN_MESSAGES=3
|
||||
DRY_RUN=0
|
||||
NO_REPAIR=0
|
||||
PI_SESSIONS_DIR="${PI_SESSIONS_DIR:-$HOME/.pi/agent/sessions}"
|
||||
|
||||
# ── Usage ────────────────────────────────────────────────────────────
|
||||
usage() {
|
||||
cat <<'EOF'
|
||||
mempalace-pi-session — mine pi coding-agent session history into MemPalace
|
||||
|
||||
Usage:
|
||||
mempalace-pi-session [options]
|
||||
|
||||
Options:
|
||||
--wing <name> Target wing (default: wing_conversations)
|
||||
--session <prefix> Export one session only (match on UUID prefix)
|
||||
--since <YYYY-MM-DD> Only sessions last modified on/after this date
|
||||
--min-messages <N> Skip sessions with fewer than N user+assistant
|
||||
turns (default: 3)
|
||||
--agent <name> Agent name recorded on drawers (default: $USER)
|
||||
--sessions-dir <path> Path to pi sessions dir (default: $PI_SESSIONS_DIR
|
||||
or ~/.pi/agent/sessions)
|
||||
--dry-run Export + list; do not mine into palace. Each session
|
||||
is tagged [NEW] or [SKIP] based on whether its
|
||||
source_file is already in the palace.
|
||||
--no-repair Skip `mempalace repair` after mining
|
||||
-h, --help Show this help
|
||||
|
||||
Idempotency:
|
||||
Re-running on the same corpus is safe. The export step writes every
|
||||
qualifying session to the cache; the mine step dedups by source_file so
|
||||
already-filed sessions are skipped without re-embedding.
|
||||
|
||||
Transcript shape per session:
|
||||
- Synthetic header as first user turn:
|
||||
[session: <title> | <cwd> | <YYYY-MM-DD> | source: pi]
|
||||
- User/assistant messages extracted from pi JSONL `message` entries
|
||||
- Assistant toolCall blocks → Claude Code `tool_use` blocks
|
||||
- `toolResult` role messages → `tool_result` blocks (folded back into
|
||||
the assistant turn by the normalizer)
|
||||
- `bashExecution`, `custom(display=true)`, `branchSummary`,
|
||||
`compactionSummary` → rendered as text annotations
|
||||
- `thinking` content blocks → dropped (noise)
|
||||
- Image content blocks → dropped (palace embeds text only)
|
||||
|
||||
Dedup:
|
||||
- source_file = absolute staging path (deterministic per pi session UUID)
|
||||
- Re-runs skip unchanged sessions. To force re-mining, delete the staging
|
||||
dir: rm -rf ~/.cache/mempalace-pi-session/<wing>/
|
||||
|
||||
Rationale:
|
||||
Pi's extension ecosystem could, in principle, stream sessions into the
|
||||
palace as they happen, but that requires a resident MCP connection during
|
||||
every pi session. This wrapper is the batch, harness-agnostic alternative:
|
||||
it reads the durable on-disk JSONL and mines it on a schedule.
|
||||
EOF
|
||||
}
|
||||
|
||||
# ── Parse args ───────────────────────────────────────────────────────
|
||||
while [[ $# -gt 0 ]]; do
|
||||
case "$1" in
|
||||
-h|--help) usage; exit 0 ;;
|
||||
--wing) WING="${2:-}"; shift 2 ;;
|
||||
--session) SESSION_ID="${2:-}"; shift 2 ;;
|
||||
--since) SINCE="${2:-}"; shift 2 ;;
|
||||
--min-messages) MIN_MESSAGES="${2:-}"; shift 2 ;;
|
||||
--agent) AGENT="${2:-}"; shift 2 ;;
|
||||
--sessions-dir) PI_SESSIONS_DIR="${2:-}"; shift 2 ;;
|
||||
--dry-run) DRY_RUN=1; shift ;;
|
||||
--no-repair) NO_REPAIR=1; shift ;;
|
||||
--) shift; break ;;
|
||||
-*) echo "error: unknown option: $1" >&2; usage >&2; exit 1 ;;
|
||||
*) echo "error: unexpected arg: $1" >&2; exit 1 ;;
|
||||
esac
|
||||
done
|
||||
|
||||
# ── Preflight ────────────────────────────────────────────────────────
|
||||
if [[ ! -d "$PI_SESSIONS_DIR" ]]; then
|
||||
echo "error: pi sessions dir not found at $PI_SESSIONS_DIR" >&2
|
||||
echo " override with --sessions-dir <path> or PI_SESSIONS_DIR env var" >&2
|
||||
exit 2
|
||||
fi
|
||||
if ! command -v mempalace >/dev/null 2>&1; then
|
||||
echo "error: mempalace CLI not found in PATH" >&2
|
||||
exit 3
|
||||
fi
|
||||
if ! [[ "$MIN_MESSAGES" =~ ^[0-9]+$ ]]; then
|
||||
echo "error: --min-messages must be an integer" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# ── Staging dir ──────────────────────────────────────────────────────
|
||||
CACHE_ROOT="${XDG_CACHE_HOME:-$HOME/.cache}/mempalace-pi-session"
|
||||
STAGE="$CACHE_ROOT/$WING"
|
||||
mkdir -p "$STAGE"
|
||||
|
||||
# ── Export sessions (Python heredoc) ────────────────────────────────
|
||||
# Parses pi JSONL files and writes Claude Code JSONL per session into $STAGE.
|
||||
# Also classifies each export as NEW/ALREADY FILED (by source_file lookup)
|
||||
# so --dry-run reports the real mine-set size. Classification is advisory;
|
||||
# `mempalace mine --mode convos` is still the authoritative dedup.
|
||||
export_count=$(python3 - "$PI_SESSIONS_DIR" "$STAGE" "$SESSION_ID" "$SINCE" "$MIN_MESSAGES" <<'PY'
|
||||
import json, os, sqlite3, sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
sessions_dir, stage, session_filter, since, min_messages = sys.argv[1:6]
|
||||
min_messages = int(min_messages)
|
||||
stage = Path(stage)
|
||||
sessions_dir = Path(sessions_dir)
|
||||
|
||||
# Convert --since YYYY-MM-DD to epoch seconds (comparing against file mtime)
|
||||
since_epoch = None
|
||||
if since:
|
||||
try:
|
||||
since_epoch = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp()
|
||||
except ValueError:
|
||||
print(f"error: --since must be YYYY-MM-DD, got {since!r}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# ── Load palace's already-filed source_files (best-effort, read-only) ──
|
||||
already_filed = set()
|
||||
palace_path = os.environ.get("MEMPALACE_PATH", os.path.expanduser("~/.mempalace/palace"))
|
||||
chroma_db = Path(palace_path) / "chroma.sqlite3"
|
||||
if chroma_db.is_file():
|
||||
try:
|
||||
pcon = sqlite3.connect(f"file:{chroma_db}?mode=ro", uri=True)
|
||||
for (sf,) in pcon.execute(
|
||||
"SELECT DISTINCT string_value FROM embedding_metadata "
|
||||
"WHERE key='source_file' AND string_value LIKE ?",
|
||||
(f"{stage}%",),
|
||||
):
|
||||
if sf:
|
||||
already_filed.add(sf)
|
||||
pcon.close()
|
||||
except sqlite3.Error:
|
||||
pass # palace unreachable → miner will dedup
|
||||
|
||||
def extract_text(content):
|
||||
"""Flatten a message content (string | list-of-blocks) to plain text.
|
||||
|
||||
Drops image + thinking blocks; keeps text + renders toolCall/toolResult
|
||||
stubs inline. Returns ("", [tool_uses], [tool_results]) where tool_uses
|
||||
are collected for assistant messages and tool_results for toolResult
|
||||
messages.
|
||||
"""
|
||||
if isinstance(content, str):
|
||||
return content, [], []
|
||||
if not isinstance(content, list):
|
||||
return "", [], []
|
||||
text_parts = []
|
||||
tool_uses = []
|
||||
for block in content:
|
||||
if not isinstance(block, dict):
|
||||
continue
|
||||
bt = block.get("type")
|
||||
if bt == "text":
|
||||
t = block.get("text", "")
|
||||
if t:
|
||||
text_parts.append(t)
|
||||
elif bt == "thinking":
|
||||
# Drop reasoning content — high-noise, low-signal for search.
|
||||
continue
|
||||
elif bt == "image":
|
||||
# Palace is text-only.
|
||||
continue
|
||||
elif bt == "toolCall":
|
||||
tool_uses.append({
|
||||
"type": "tool_use",
|
||||
"id": block.get("id") or "",
|
||||
"name": block.get("name") or "tool",
|
||||
"input": block.get("arguments") or {},
|
||||
})
|
||||
return "\n".join(text_parts), tool_uses, []
|
||||
|
||||
def load_session(path: Path):
|
||||
"""Parse a pi JSONL session file. Returns (header, entries) or None."""
|
||||
try:
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
lines = [ln for ln in f.read().splitlines() if ln.strip()]
|
||||
except OSError:
|
||||
return None
|
||||
if not lines:
|
||||
return None
|
||||
try:
|
||||
header = json.loads(lines[0])
|
||||
except json.JSONDecodeError:
|
||||
return None
|
||||
if header.get("type") != "session":
|
||||
return None
|
||||
entries = []
|
||||
for ln in lines[1:]:
|
||||
try:
|
||||
entries.append(json.loads(ln))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
return header, entries
|
||||
|
||||
def derive_title(entries, fallback: str) -> str:
|
||||
"""Prefer session_info.name; else truncated first user message."""
|
||||
# session_info entries: most-recent wins
|
||||
name = None
|
||||
for e in entries:
|
||||
if e.get("type") == "session_info" and e.get("name"):
|
||||
name = e["name"]
|
||||
if name:
|
||||
return name[:120]
|
||||
for e in entries:
|
||||
if e.get("type") != "message":
|
||||
continue
|
||||
msg = e.get("message") or {}
|
||||
if msg.get("role") != "user":
|
||||
continue
|
||||
text, _, _ = extract_text(msg.get("content"))
|
||||
text = " ".join(text.split()) # collapse whitespace
|
||||
if text:
|
||||
return (text[:80] + "…") if len(text) > 80 else text
|
||||
return fallback
|
||||
|
||||
# Discover session files
|
||||
paths = sorted(sessions_dir.rglob("*.jsonl"))
|
||||
if session_filter:
|
||||
paths = [p for p in paths if session_filter in p.name]
|
||||
|
||||
exported = 0
|
||||
skipped_short = 0
|
||||
skipped_malformed = 0
|
||||
skipped_already_filed = 0
|
||||
|
||||
for path in paths:
|
||||
try:
|
||||
mtime = path.stat().st_mtime
|
||||
except OSError:
|
||||
continue
|
||||
if since_epoch is not None and mtime < since_epoch:
|
||||
continue
|
||||
|
||||
parsed = load_session(path)
|
||||
if parsed is None:
|
||||
skipped_malformed += 1
|
||||
continue
|
||||
header, entries = parsed
|
||||
session_uuid = header.get("id") or path.stem
|
||||
cwd = header.get("cwd") or "?"
|
||||
header_ts = header.get("timestamp") or ""
|
||||
try:
|
||||
date_str = header_ts[:10] if header_ts else datetime.fromtimestamp(
|
||||
mtime, tz=timezone.utc).strftime("%Y-%m-%d")
|
||||
except Exception:
|
||||
date_str = datetime.fromtimestamp(mtime, tz=timezone.utc).strftime("%Y-%m-%d")
|
||||
|
||||
# Count user+assistant message entries for the min-messages filter
|
||||
turn_count = sum(
|
||||
1 for e in entries
|
||||
if e.get("type") == "message"
|
||||
and (e.get("message") or {}).get("role") in ("user", "assistant")
|
||||
)
|
||||
if turn_count < min_messages:
|
||||
skipped_short += 1
|
||||
continue
|
||||
|
||||
title = derive_title(entries, fallback=session_uuid[:8])
|
||||
out_lines = []
|
||||
out_lines.append({
|
||||
"type": "user",
|
||||
"message": {
|
||||
"content": f"[session: {title} | {cwd} | {date_str} | source: pi]"
|
||||
},
|
||||
})
|
||||
|
||||
for e in entries:
|
||||
t = e.get("type")
|
||||
if t == "message":
|
||||
msg = e.get("message") or {}
|
||||
role = msg.get("role")
|
||||
if role == "user":
|
||||
text, _, _ = extract_text(msg.get("content"))
|
||||
if text.strip():
|
||||
out_lines.append({"type": "user", "message": {"content": text}})
|
||||
elif role == "assistant":
|
||||
text, tool_uses, _ = extract_text(msg.get("content"))
|
||||
blocks = []
|
||||
if text.strip():
|
||||
blocks.append({"type": "text", "text": text})
|
||||
blocks.extend(tool_uses)
|
||||
if not blocks:
|
||||
continue
|
||||
# Simplify single-text to string (matches mempalace-session).
|
||||
if len(blocks) == 1 and blocks[0].get("type") == "text":
|
||||
content = blocks[0]["text"]
|
||||
else:
|
||||
content = blocks
|
||||
out_lines.append({"type": "assistant", "message": {"content": content}})
|
||||
elif role == "toolResult":
|
||||
text, _, _ = extract_text(msg.get("content"))
|
||||
tool_id = msg.get("toolCallId") or ""
|
||||
if not tool_id:
|
||||
continue
|
||||
out_lines.append({
|
||||
"type": "human",
|
||||
"message": {
|
||||
"content": [{
|
||||
"type": "tool_result",
|
||||
"tool_use_id": tool_id,
|
||||
"content": text or "(no output)",
|
||||
}],
|
||||
},
|
||||
})
|
||||
elif role == "bashExecution":
|
||||
# Rendered as a synthetic assistant annotation so the
|
||||
# command + output stay associated with the surrounding turn.
|
||||
cmd = msg.get("command") or ""
|
||||
out = msg.get("output") or ""
|
||||
exit_code = msg.get("exitCode")
|
||||
note = f"[user-bash] $ {cmd}\nexit={exit_code}\n{out}".strip()
|
||||
if note:
|
||||
out_lines.append({"type": "user", "message": {"content": note}})
|
||||
elif role == "custom":
|
||||
if not msg.get("display"):
|
||||
continue
|
||||
text, _, _ = extract_text(msg.get("content"))
|
||||
if text.strip():
|
||||
ctype = msg.get("customType") or "custom"
|
||||
out_lines.append({
|
||||
"type": "user",
|
||||
"message": {"content": f"[custom:{ctype}] {text}"},
|
||||
})
|
||||
elif role in ("branchSummary", "compactionSummary"):
|
||||
summary = msg.get("summary") or ""
|
||||
if summary.strip():
|
||||
out_lines.append({
|
||||
"type": "user",
|
||||
"message": {"content": f"[{role}] {summary}"},
|
||||
})
|
||||
# thinking-only / empty messages silently dropped
|
||||
elif t in (
|
||||
"model_change", "thinking_level_change", "compaction",
|
||||
"branch_summary", "label", "session_info", "custom",
|
||||
"custom_message",
|
||||
):
|
||||
# Non-conversational entries: drop. (custom_message with
|
||||
# display=true could be included but we already get it via the
|
||||
# "custom" message role above when pi materializes one.)
|
||||
continue
|
||||
|
||||
# Need at least 2 turns (header + one real turn) for the normalizer.
|
||||
if len(out_lines) < 2:
|
||||
skipped_short += 1
|
||||
continue
|
||||
|
||||
out_path = stage / f"pi_{session_uuid}.jsonl"
|
||||
with out_path.open("w", encoding="utf-8") as f:
|
||||
for obj in out_lines:
|
||||
f.write(json.dumps(obj, ensure_ascii=False) + "\n")
|
||||
|
||||
# Preserve session mtime on the staging file for dedup stability.
|
||||
try:
|
||||
os.utime(out_path, (mtime, mtime))
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
exported += 1
|
||||
is_filed = str(out_path) in already_filed
|
||||
if is_filed:
|
||||
skipped_already_filed += 1
|
||||
status = "SKIP" if is_filed else "NEW "
|
||||
print(f" [{status}] {out_path.name} ({turn_count} turns)", file=sys.stderr)
|
||||
|
||||
print(f"EXPORTED {exported}")
|
||||
print(f"ALREADY_FILED {skipped_already_filed}")
|
||||
if skipped_short:
|
||||
print(f"SKIPPED_SHORT {skipped_short}", file=sys.stderr)
|
||||
if skipped_malformed:
|
||||
print(f"SKIPPED_MALFORMED {skipped_malformed}", file=sys.stderr)
|
||||
PY
|
||||
)
|
||||
|
||||
# Parse counts from stdout
|
||||
count="$(printf '%s\n' "$export_count" | awk '/^EXPORTED / { print $2 }')"
|
||||
count="${count:-0}"
|
||||
already_filed="$(printf '%s\n' "$export_count" | awk '/^ALREADY_FILED / { print $2 }')"
|
||||
already_filed="${already_filed:-0}"
|
||||
to_file=$(( count - already_filed ))
|
||||
|
||||
if [[ "$count" -eq 0 ]]; then
|
||||
echo "no sessions qualified for export"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "Exported $count session(s) to $STAGE"
|
||||
echo " $to_file new → will be filed on mine"
|
||||
echo " $already_filed already filed → will be skipped (dedup by source_file)"
|
||||
|
||||
if [[ $DRY_RUN -eq 1 ]]; then
|
||||
if [[ "$to_file" -eq 0 ]]; then
|
||||
echo ""
|
||||
echo "--dry-run: no new sessions to mine. A real run would skip all $count."
|
||||
else
|
||||
echo ""
|
||||
echo "--dry-run: skipping mine step. A real run would file $to_file new session(s)."
|
||||
fi
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# ── Run the mine ─────────────────────────────────────────────────────
|
||||
echo ""
|
||||
echo "Mining into wing '$WING'..."
|
||||
if ! mempalace mine "$STAGE" --mode convos --wing "$WING" --agent "$AGENT"; then
|
||||
echo "error: mempalace mine failed" >&2
|
||||
exit 4
|
||||
fi
|
||||
|
||||
# ── Repair index ─────────────────────────────────────────────────────
|
||||
if [[ $NO_REPAIR -eq 0 ]]; then
|
||||
echo ""
|
||||
echo "Rebuilding HNSW index..."
|
||||
mempalace repair --yes
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "Done. Wing '$WING' updated. Remember to reconnect any live MCP sessions."
|
||||
Reference in New Issue
Block a user