From 9450a451940e44fb0eba8425f75ab617f3401be2 Mon Sep 17 00:00:00 2001 From: Joakim Persson Date: Tue, 5 May 2026 08:48:20 +0200 Subject: [PATCH] feat(pi-session): add mempalace-pi-session feeder for pi coding-agent sessions Parallel to mempalace-session, this wrapper walks ~/.pi/agent/sessions/ JSONL files and mines qualifying sessions into wing_conversations via 'mempalace mine --mode convos'. Design choices mirror mempalace-session: - Export-stage-mine idiom with deterministic per-session staging paths under ~/.cache/mempalace-pi-session//, so 'mempalace mine' dedup on source_file makes re-runs idempotent. - --dry-run classifies each export as [NEW] or [SKIP] by matching staging path against the palace's already-filed source_files. - --min-messages filter skips throwaway single-prompt sessions. Pi-specific parsing: - Pi JSONL is a typed tree (id/parentId) per docs/session-format.md; this walks in file order, which is correct for the overwhelmingly linear case and harmlessly duplicative on branched sessions (palace semantic dedup handles it). - Roles mapped to Claude Code JSONL shape: user -> {type:user, content:text} assistant -> {type:assistant, content:[text, tool_use]} toolResult-> {type:human, content:[tool_result]} (folded back by normalizer) bashExecution/custom(display)/branchSummary/compactionSummary -> rendered as text annotations - thinking blocks and image blocks dropped (noise / palace is text-only). Source labelling: - Staging filenames prefixed 'pi_.jsonl' so every drawer's source_file metadata (visible in search results) unambiguously identifies the harness. Opencode's convention ('_.jsonl') is preserved to keep the existing 19k+ drawers deduped. - Inline synthetic header on first chunk: [session: | <cwd> | <date> | source: pi] as a secondary signal. --- bin/mempalace-pi-session | 473 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 473 insertions(+) create mode 100755 bin/mempalace-pi-session diff --git a/bin/mempalace-pi-session b/bin/mempalace-pi-session new file mode 100755 index 0000000..54b6128 --- /dev/null +++ b/bin/mempalace-pi-session @@ -0,0 +1,473 @@ +#!/usr/bin/env bash +# mempalace-pi-session — mine pi coding-agent session history into MemPalace +# +# Pi persists every session (verbatim user/assistant turns + tool calls + tool +# results) as newline-delimited JSONL under ~/.pi/agent/sessions/. Pi has no +# upstream MemPalace integration and mempalace-toolkit's existing wrapper +# (`mempalace-session`) only handles opencode's SQLite DB, so pi sessions are +# currently invisible to the palace. +# +# Strategy (mirrors mempalace-session): +# 1. Walk ~/.pi/agent/sessions/**/*.jsonl and export each qualifying session +# to a Claude Code JSONL file (format the mempalace normalizer speaks). +# 2. Stage exports under ~/.cache/mempalace-pi-session/<wing>/. +# 3. Run `mempalace mine --mode convos` against the staging dir. +# +# Labelling: every exported transcript begins with a synthetic header +# [session: <title> | <cwd> | <YYYY-MM-DD> | source: pi] +# so post-mine search results are self-identifying (pi vs opencode vs other). +# +# Dedup: mempalace convos mode keys on source_file (absolute staging path). +# Staging paths are deterministic per pi session UUID, so re-runs are +# idempotent until session content actually changes. +# +# Session filter: sessions with fewer than --min-messages *user+assistant* +# messages (default 3) are skipped to avoid filing single-prompt throwaways. +# +# Usage: +# mempalace-pi-session +# mempalace-pi-session --wing <name> +# mempalace-pi-session --session <uuid-prefix> +# mempalace-pi-session --since 2026-04-01 +# mempalace-pi-session --min-messages 6 +# mempalace-pi-session --dry-run +# mempalace-pi-session --help +# +# Exit codes: +# 0 success +# 1 usage / argument error +# 2 pi sessions dir missing +# 3 mempalace CLI not installed +# 4 mine failed +# +# Dependencies: bash, python3 (stdlib only), mempalace (v3.3.3+) + +set -euo pipefail + +# ── Defaults ───────────────────────────────────────────────────────── +AGENT="${USER:-mempalace}" +WING="wing_conversations" +SESSION_ID="" +SINCE="" +MIN_MESSAGES=3 +DRY_RUN=0 +NO_REPAIR=0 +PI_SESSIONS_DIR="${PI_SESSIONS_DIR:-$HOME/.pi/agent/sessions}" + +# ── Usage ──────────────────────────────────────────────────────────── +usage() { + cat <<'EOF' +mempalace-pi-session — mine pi coding-agent session history into MemPalace + +Usage: + mempalace-pi-session [options] + +Options: + --wing <name> Target wing (default: wing_conversations) + --session <prefix> Export one session only (match on UUID prefix) + --since <YYYY-MM-DD> Only sessions last modified on/after this date + --min-messages <N> Skip sessions with fewer than N user+assistant + turns (default: 3) + --agent <name> Agent name recorded on drawers (default: $USER) + --sessions-dir <path> Path to pi sessions dir (default: $PI_SESSIONS_DIR + or ~/.pi/agent/sessions) + --dry-run Export + list; do not mine into palace. Each session + is tagged [NEW] or [SKIP] based on whether its + source_file is already in the palace. + --no-repair Skip `mempalace repair` after mining + -h, --help Show this help + +Idempotency: + Re-running on the same corpus is safe. The export step writes every + qualifying session to the cache; the mine step dedups by source_file so + already-filed sessions are skipped without re-embedding. + +Transcript shape per session: + - Synthetic header as first user turn: + [session: <title> | <cwd> | <YYYY-MM-DD> | source: pi] + - User/assistant messages extracted from pi JSONL `message` entries + - Assistant toolCall blocks → Claude Code `tool_use` blocks + - `toolResult` role messages → `tool_result` blocks (folded back into + the assistant turn by the normalizer) + - `bashExecution`, `custom(display=true)`, `branchSummary`, + `compactionSummary` → rendered as text annotations + - `thinking` content blocks → dropped (noise) + - Image content blocks → dropped (palace embeds text only) + +Dedup: + - source_file = absolute staging path (deterministic per pi session UUID) + - Re-runs skip unchanged sessions. To force re-mining, delete the staging + dir: rm -rf ~/.cache/mempalace-pi-session/<wing>/ + +Rationale: + Pi's extension ecosystem could, in principle, stream sessions into the + palace as they happen, but that requires a resident MCP connection during + every pi session. This wrapper is the batch, harness-agnostic alternative: + it reads the durable on-disk JSONL and mines it on a schedule. +EOF +} + +# ── Parse args ─────────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case "$1" in + -h|--help) usage; exit 0 ;; + --wing) WING="${2:-}"; shift 2 ;; + --session) SESSION_ID="${2:-}"; shift 2 ;; + --since) SINCE="${2:-}"; shift 2 ;; + --min-messages) MIN_MESSAGES="${2:-}"; shift 2 ;; + --agent) AGENT="${2:-}"; shift 2 ;; + --sessions-dir) PI_SESSIONS_DIR="${2:-}"; shift 2 ;; + --dry-run) DRY_RUN=1; shift ;; + --no-repair) NO_REPAIR=1; shift ;; + --) shift; break ;; + -*) echo "error: unknown option: $1" >&2; usage >&2; exit 1 ;; + *) echo "error: unexpected arg: $1" >&2; exit 1 ;; + esac +done + +# ── Preflight ──────────────────────────────────────────────────────── +if [[ ! -d "$PI_SESSIONS_DIR" ]]; then + echo "error: pi sessions dir not found at $PI_SESSIONS_DIR" >&2 + echo " override with --sessions-dir <path> or PI_SESSIONS_DIR env var" >&2 + exit 2 +fi +if ! command -v mempalace >/dev/null 2>&1; then + echo "error: mempalace CLI not found in PATH" >&2 + exit 3 +fi +if ! [[ "$MIN_MESSAGES" =~ ^[0-9]+$ ]]; then + echo "error: --min-messages must be an integer" >&2 + exit 1 +fi + +# ── Staging dir ────────────────────────────────────────────────────── +CACHE_ROOT="${XDG_CACHE_HOME:-$HOME/.cache}/mempalace-pi-session" +STAGE="$CACHE_ROOT/$WING" +mkdir -p "$STAGE" + +# ── Export sessions (Python heredoc) ──────────────────────────────── +# Parses pi JSONL files and writes Claude Code JSONL per session into $STAGE. +# Also classifies each export as NEW/ALREADY FILED (by source_file lookup) +# so --dry-run reports the real mine-set size. Classification is advisory; +# `mempalace mine --mode convos` is still the authoritative dedup. +export_count=$(python3 - "$PI_SESSIONS_DIR" "$STAGE" "$SESSION_ID" "$SINCE" "$MIN_MESSAGES" <<'PY' +import json, os, sqlite3, sys +from datetime import datetime, timezone +from pathlib import Path + +sessions_dir, stage, session_filter, since, min_messages = sys.argv[1:6] +min_messages = int(min_messages) +stage = Path(stage) +sessions_dir = Path(sessions_dir) + +# Convert --since YYYY-MM-DD to epoch seconds (comparing against file mtime) +since_epoch = None +if since: + try: + since_epoch = datetime.strptime(since, "%Y-%m-%d").replace(tzinfo=timezone.utc).timestamp() + except ValueError: + print(f"error: --since must be YYYY-MM-DD, got {since!r}", file=sys.stderr) + sys.exit(1) + +# ── Load palace's already-filed source_files (best-effort, read-only) ── +already_filed = set() +palace_path = os.environ.get("MEMPALACE_PATH", os.path.expanduser("~/.mempalace/palace")) +chroma_db = Path(palace_path) / "chroma.sqlite3" +if chroma_db.is_file(): + try: + pcon = sqlite3.connect(f"file:{chroma_db}?mode=ro", uri=True) + for (sf,) in pcon.execute( + "SELECT DISTINCT string_value FROM embedding_metadata " + "WHERE key='source_file' AND string_value LIKE ?", + (f"{stage}%",), + ): + if sf: + already_filed.add(sf) + pcon.close() + except sqlite3.Error: + pass # palace unreachable → miner will dedup + +def extract_text(content): + """Flatten a message content (string | list-of-blocks) to plain text. + + Drops image + thinking blocks; keeps text + renders toolCall/toolResult + stubs inline. Returns ("", [tool_uses], [tool_results]) where tool_uses + are collected for assistant messages and tool_results for toolResult + messages. + """ + if isinstance(content, str): + return content, [], [] + if not isinstance(content, list): + return "", [], [] + text_parts = [] + tool_uses = [] + for block in content: + if not isinstance(block, dict): + continue + bt = block.get("type") + if bt == "text": + t = block.get("text", "") + if t: + text_parts.append(t) + elif bt == "thinking": + # Drop reasoning content — high-noise, low-signal for search. + continue + elif bt == "image": + # Palace is text-only. + continue + elif bt == "toolCall": + tool_uses.append({ + "type": "tool_use", + "id": block.get("id") or "", + "name": block.get("name") or "tool", + "input": block.get("arguments") or {}, + }) + return "\n".join(text_parts), tool_uses, [] + +def load_session(path: Path): + """Parse a pi JSONL session file. Returns (header, entries) or None.""" + try: + with path.open("r", encoding="utf-8") as f: + lines = [ln for ln in f.read().splitlines() if ln.strip()] + except OSError: + return None + if not lines: + return None + try: + header = json.loads(lines[0]) + except json.JSONDecodeError: + return None + if header.get("type") != "session": + return None + entries = [] + for ln in lines[1:]: + try: + entries.append(json.loads(ln)) + except json.JSONDecodeError: + continue + return header, entries + +def derive_title(entries, fallback: str) -> str: + """Prefer session_info.name; else truncated first user message.""" + # session_info entries: most-recent wins + name = None + for e in entries: + if e.get("type") == "session_info" and e.get("name"): + name = e["name"] + if name: + return name[:120] + for e in entries: + if e.get("type") != "message": + continue + msg = e.get("message") or {} + if msg.get("role") != "user": + continue + text, _, _ = extract_text(msg.get("content")) + text = " ".join(text.split()) # collapse whitespace + if text: + return (text[:80] + "…") if len(text) > 80 else text + return fallback + +# Discover session files +paths = sorted(sessions_dir.rglob("*.jsonl")) +if session_filter: + paths = [p for p in paths if session_filter in p.name] + +exported = 0 +skipped_short = 0 +skipped_malformed = 0 +skipped_already_filed = 0 + +for path in paths: + try: + mtime = path.stat().st_mtime + except OSError: + continue + if since_epoch is not None and mtime < since_epoch: + continue + + parsed = load_session(path) + if parsed is None: + skipped_malformed += 1 + continue + header, entries = parsed + session_uuid = header.get("id") or path.stem + cwd = header.get("cwd") or "?" + header_ts = header.get("timestamp") or "" + try: + date_str = header_ts[:10] if header_ts else datetime.fromtimestamp( + mtime, tz=timezone.utc).strftime("%Y-%m-%d") + except Exception: + date_str = datetime.fromtimestamp(mtime, tz=timezone.utc).strftime("%Y-%m-%d") + + # Count user+assistant message entries for the min-messages filter + turn_count = sum( + 1 for e in entries + if e.get("type") == "message" + and (e.get("message") or {}).get("role") in ("user", "assistant") + ) + if turn_count < min_messages: + skipped_short += 1 + continue + + title = derive_title(entries, fallback=session_uuid[:8]) + out_lines = [] + out_lines.append({ + "type": "user", + "message": { + "content": f"[session: {title} | {cwd} | {date_str} | source: pi]" + }, + }) + + for e in entries: + t = e.get("type") + if t == "message": + msg = e.get("message") or {} + role = msg.get("role") + if role == "user": + text, _, _ = extract_text(msg.get("content")) + if text.strip(): + out_lines.append({"type": "user", "message": {"content": text}}) + elif role == "assistant": + text, tool_uses, _ = extract_text(msg.get("content")) + blocks = [] + if text.strip(): + blocks.append({"type": "text", "text": text}) + blocks.extend(tool_uses) + if not blocks: + continue + # Simplify single-text to string (matches mempalace-session). + if len(blocks) == 1 and blocks[0].get("type") == "text": + content = blocks[0]["text"] + else: + content = blocks + out_lines.append({"type": "assistant", "message": {"content": content}}) + elif role == "toolResult": + text, _, _ = extract_text(msg.get("content")) + tool_id = msg.get("toolCallId") or "" + if not tool_id: + continue + out_lines.append({ + "type": "human", + "message": { + "content": [{ + "type": "tool_result", + "tool_use_id": tool_id, + "content": text or "(no output)", + }], + }, + }) + elif role == "bashExecution": + # Rendered as a synthetic assistant annotation so the + # command + output stay associated with the surrounding turn. + cmd = msg.get("command") or "" + out = msg.get("output") or "" + exit_code = msg.get("exitCode") + note = f"[user-bash] $ {cmd}\nexit={exit_code}\n{out}".strip() + if note: + out_lines.append({"type": "user", "message": {"content": note}}) + elif role == "custom": + if not msg.get("display"): + continue + text, _, _ = extract_text(msg.get("content")) + if text.strip(): + ctype = msg.get("customType") or "custom" + out_lines.append({ + "type": "user", + "message": {"content": f"[custom:{ctype}] {text}"}, + }) + elif role in ("branchSummary", "compactionSummary"): + summary = msg.get("summary") or "" + if summary.strip(): + out_lines.append({ + "type": "user", + "message": {"content": f"[{role}] {summary}"}, + }) + # thinking-only / empty messages silently dropped + elif t in ( + "model_change", "thinking_level_change", "compaction", + "branch_summary", "label", "session_info", "custom", + "custom_message", + ): + # Non-conversational entries: drop. (custom_message with + # display=true could be included but we already get it via the + # "custom" message role above when pi materializes one.) + continue + + # Need at least 2 turns (header + one real turn) for the normalizer. + if len(out_lines) < 2: + skipped_short += 1 + continue + + out_path = stage / f"pi_{session_uuid}.jsonl" + with out_path.open("w", encoding="utf-8") as f: + for obj in out_lines: + f.write(json.dumps(obj, ensure_ascii=False) + "\n") + + # Preserve session mtime on the staging file for dedup stability. + try: + os.utime(out_path, (mtime, mtime)) + except OSError: + pass + + exported += 1 + is_filed = str(out_path) in already_filed + if is_filed: + skipped_already_filed += 1 + status = "SKIP" if is_filed else "NEW " + print(f" [{status}] {out_path.name} ({turn_count} turns)", file=sys.stderr) + +print(f"EXPORTED {exported}") +print(f"ALREADY_FILED {skipped_already_filed}") +if skipped_short: + print(f"SKIPPED_SHORT {skipped_short}", file=sys.stderr) +if skipped_malformed: + print(f"SKIPPED_MALFORMED {skipped_malformed}", file=sys.stderr) +PY +) + +# Parse counts from stdout +count="$(printf '%s\n' "$export_count" | awk '/^EXPORTED / { print $2 }')" +count="${count:-0}" +already_filed="$(printf '%s\n' "$export_count" | awk '/^ALREADY_FILED / { print $2 }')" +already_filed="${already_filed:-0}" +to_file=$(( count - already_filed )) + +if [[ "$count" -eq 0 ]]; then + echo "no sessions qualified for export" + exit 0 +fi + +echo "" +echo "Exported $count session(s) to $STAGE" +echo " $to_file new → will be filed on mine" +echo " $already_filed already filed → will be skipped (dedup by source_file)" + +if [[ $DRY_RUN -eq 1 ]]; then + if [[ "$to_file" -eq 0 ]]; then + echo "" + echo "--dry-run: no new sessions to mine. A real run would skip all $count." + else + echo "" + echo "--dry-run: skipping mine step. A real run would file $to_file new session(s)." + fi + exit 0 +fi + +# ── Run the mine ───────────────────────────────────────────────────── +echo "" +echo "Mining into wing '$WING'..." +if ! mempalace mine "$STAGE" --mode convos --wing "$WING" --agent "$AGENT"; then + echo "error: mempalace mine failed" >&2 + exit 4 +fi + +# ── Repair index ───────────────────────────────────────────────────── +if [[ $NO_REPAIR -eq 0 ]]; then + echo "" + echo "Rebuilding HNSW index..." + mempalace repair --yes +fi + +echo "" +echo "Done. Wing '$WING' updated. Remember to reconnect any live MCP sessions."