Created
March 19, 2026 01:30
-
-
Save afgallo/89510d6b5e473bd47bc674f3934935d8 to your computer and use it in GitHub Desktop.
claude-session-to-obsidian
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Claude Code session export to Obsidian. | |
| Called by SessionEnd hook — receives JSON on stdin with session_id, transcript_path, cwd. | |
| Creates an Obsidian note in Claude Sessions/ and appends a summary to today's daily note. | |
| """ | |
| import json | |
| import subprocess | |
| import sys | |
| import os | |
| import re | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| MAX_CONVERSATION_LINES = 500 # Limit note size | |
| MAX_SUMMARY_LENGTH = 120 | |
| def parse_stdin(): | |
| """Parse hook input from stdin.""" | |
| try: | |
| return json.loads(sys.stdin.read()) | |
| except (json.JSONDecodeError, EOFError): | |
| return {} | |
| def find_transcript(hook_input): | |
| """Find the session transcript JSONL file.""" | |
| # Try transcript_path from hook input first | |
| transcript = hook_input.get("transcript_path", "") | |
| if transcript and os.path.exists(transcript): | |
| return transcript | |
| # Fallback: search by session_id in project dirs | |
| session_id = hook_input.get("session_id", "") | |
| if not session_id: | |
| return None | |
| claude_dir = Path.home() / ".claude" / "projects" | |
| if claude_dir.exists(): | |
| for jsonl in claude_dir.rglob(f"{session_id}.jsonl"): | |
| return str(jsonl) | |
| return None | |
| def parse_transcript(path): | |
| """Parse a session JSONL transcript into structured messages.""" | |
| messages = [] | |
| session_id = None | |
| cwd = None | |
| first_timestamp = None | |
| last_timestamp = None | |
| with open(path) as f: | |
| for line in f: | |
| try: | |
| obj = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| msg_type = obj.get("type") | |
| if msg_type not in ("user", "assistant"): | |
| continue | |
| if not session_id: | |
| session_id = obj.get("sessionId", "") | |
| if not cwd: | |
| cwd = obj.get("cwd", "") | |
| timestamp = obj.get("timestamp", "") | |
| if timestamp and not first_timestamp: | |
| first_timestamp = timestamp | |
| if timestamp: | |
| last_timestamp = timestamp | |
| message = obj.get("message", {}) | |
| if isinstance(message, str): | |
| try: | |
| message = json.loads(message) | |
| except json.JSONDecodeError: | |
| message = {"role": msg_type, "content": message} | |
| role = message.get("role", msg_type) | |
| content = message.get("content", "") | |
| # Extract text from content | |
| text = "" | |
| if isinstance(content, str): | |
| text = content | |
| elif isinstance(content, list): | |
| parts = [] | |
| for item in content: | |
| if isinstance(item, dict): | |
| if item.get("type") == "text": | |
| parts.append(item.get("text", "")) | |
| elif item.get("type") == "tool_use": | |
| tool_name = item.get("name", "unknown") | |
| parts.append(f"[Tool: {tool_name}]") | |
| elif item.get("type") == "tool_result": | |
| parts.append("[Tool result]") | |
| text = "\n".join(parts) | |
| if text.strip(): | |
| messages.append({ | |
| "role": role, | |
| "text": text.strip(), | |
| "timestamp": timestamp, | |
| }) | |
| return { | |
| "messages": messages, | |
| "session_id": session_id or Path(path).stem, | |
| "cwd": cwd or "", | |
| "first_timestamp": first_timestamp, | |
| "last_timestamp": last_timestamp, | |
| } | |
| def format_timestamp(ts_str): | |
| """Parse ISO or epoch timestamp to datetime.""" | |
| if not ts_str: | |
| return datetime.now() | |
| try: | |
| if isinstance(ts_str, (int, float)): | |
| return datetime.fromtimestamp(ts_str / 1000) | |
| return datetime.fromisoformat(ts_str.replace("Z", "+00:00")).astimezone() | |
| except (ValueError, TypeError): | |
| return datetime.now() | |
| def generate_summary(messages): | |
| """Generate a summary from the first meaningful user message.""" | |
| for msg in messages: | |
| if msg["role"] == "user": | |
| text = msg["text"] | |
| # Clean up system tags | |
| text = re.sub(r"<system-reminder>.*?</system-reminder>", "", text, flags=re.DOTALL) | |
| # Skip tool results, tool calls, and system messages | |
| text = re.sub(r"\[Tool result\]", "", text) | |
| text = re.sub(r"\[Tool: \w+\]", "", text) | |
| text = re.sub(r"\[Request interrupted by user.*?\]", "", text) | |
| text = text.strip() | |
| if not text: | |
| continue | |
| if len(text) > MAX_SUMMARY_LENGTH: | |
| text = text[:MAX_SUMMARY_LENGTH].rsplit(" ", 1)[0] + "..." | |
| return text | |
| return "Claude Code session" | |
| def format_conversation(messages): | |
| """Format messages into readable markdown.""" | |
| lines = [] | |
| line_count = 0 | |
| for msg in messages: | |
| role = "User" if msg["role"] == "user" else "Claude" | |
| text = msg["text"] | |
| # Truncate very long messages | |
| msg_lines = text.split("\n") | |
| if len(msg_lines) > 50: | |
| text = "\n".join(msg_lines[:50]) + f"\n\n... ({len(msg_lines) - 50} more lines)" | |
| lines.append(f"**{role}:**") | |
| lines.append(text) | |
| lines.append("") | |
| line_count += len(text.split("\n")) + 2 | |
| if line_count > MAX_CONVERSATION_LINES: | |
| lines.append(f"... (truncated, {len(messages)} total messages)") | |
| break | |
| return "\n".join(lines) | |
| def obsidian_create(path, content): | |
| """Create an Obsidian note via CLI.""" | |
| # Escape content for shell | |
| try: | |
| result = subprocess.run( | |
| ["obsidian", "create", f"path={path}", f"content={content}", "overwrite"], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| return result.returncode == 0 | |
| except (subprocess.TimeoutExpired, FileNotFoundError): | |
| return False | |
| DAILY_SECTIONS = [ | |
| "## Claude Sessions", | |
| "## Work", | |
| "## Personal", | |
| "## Journal", | |
| ] | |
| def obsidian_daily_read(): | |
| """Read today's daily note content.""" | |
| try: | |
| result = subprocess.run( | |
| ["obsidian", "daily:read"], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| return result.stdout if result.returncode == 0 else "" | |
| except (subprocess.TimeoutExpired, FileNotFoundError): | |
| return "" | |
| def obsidian_daily_write(content): | |
| """Overwrite today's daily note with new content.""" | |
| try: | |
| # Get daily note path and write via obsidian create with overwrite | |
| result = subprocess.run( | |
| ["obsidian", "daily:path"], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| if result.returncode != 0: | |
| return False | |
| daily_path = result.stdout.strip() | |
| result = subprocess.run( | |
| ["obsidian", "create", f"path={daily_path}", f"content={content}", "overwrite"], | |
| capture_output=True, text=True, timeout=10, | |
| ) | |
| return result.returncode == 0 | |
| except (subprocess.TimeoutExpired, FileNotFoundError): | |
| return False | |
| def ensure_daily_sections(): | |
| """Ensure today's daily note has all standard sections.""" | |
| content = obsidian_daily_read() | |
| # If note is empty or new, create with all sections | |
| if not content.strip(): | |
| sections = "\n\n".join(DAILY_SECTIONS) | |
| obsidian_daily_write(sections + "\n") | |
| return | |
| # Add any missing sections at the end | |
| missing = [s for s in DAILY_SECTIONS if s not in content] | |
| if missing: | |
| new_content = content.rstrip() + "\n\n" + "\n\n".join(missing) + "\n" | |
| obsidian_daily_write(new_content) | |
| def obsidian_daily_append_to_section(section, line): | |
| """Append a line under a specific section in today's daily note.""" | |
| content = obsidian_daily_read() | |
| if not content: | |
| return False | |
| lines = content.split("\n") | |
| insert_idx = None | |
| section_found = False | |
| for i, l in enumerate(lines): | |
| if l.strip() == section: | |
| section_found = True | |
| # Find the last non-empty content line before next heading | |
| last_content = i | |
| next_heading = len(lines) | |
| for j in range(i + 1, len(lines)): | |
| if lines[j].startswith("## "): | |
| next_heading = j | |
| break | |
| if lines[j].strip(): | |
| last_content = j | |
| insert_idx = last_content + 1 | |
| break | |
| if not section_found: | |
| return False | |
| lines.insert(insert_idx, line) | |
| new_content = "\n".join(lines) | |
| return obsidian_daily_write(new_content) | |
| def qmd_update(): | |
| """Re-index QMD collections in background.""" | |
| try: | |
| subprocess.Popen( | |
| ["qmd", "update"], | |
| stdout=subprocess.DEVNULL, | |
| stderr=subprocess.DEVNULL, | |
| ) | |
| except FileNotFoundError: | |
| pass | |
| def main(): | |
| hook_input = parse_stdin() | |
| # Find transcript | |
| transcript_path = find_transcript(hook_input) | |
| if not transcript_path: | |
| # No transcript found — nothing to export | |
| sys.exit(0) | |
| # Parse transcript | |
| data = parse_transcript(transcript_path) | |
| messages = data["messages"] | |
| if not messages: | |
| sys.exit(0) | |
| # Extract metadata | |
| session_id = data["session_id"] | |
| project = os.path.basename(data["cwd"]) if data["cwd"] else "unknown" | |
| dt = format_timestamp(data["first_timestamp"]) | |
| date_str = dt.strftime("%d/%m/%Y") | |
| date_iso = dt.strftime("%Y-%m-%d") | |
| time_str = dt.strftime("%H:%M") | |
| summary = generate_summary(messages) | |
| msg_count = len(messages) | |
| # Build note path: Claude Sessions/YYYY/MM/filename | |
| year = dt.strftime("%Y") | |
| month = dt.strftime("%m") | |
| safe_summary = re.sub(r"[^\w\s-]", "", summary[:60]).strip().replace(" ", "-") | |
| filename = f"{date_iso}-{time_str.replace(':', '')}-{safe_summary}.md" | |
| note_path = f"Claude Sessions/{year}/{month}/{filename}" | |
| # Build frontmatter + content | |
| conversation = format_conversation(messages) | |
| note_content = f"""--- | |
| type: claude-session | |
| session_id: {session_id} | |
| date: {date_str} | |
| time: {time_str} | |
| project: {project} | |
| message_count: {msg_count} | |
| tags: [] | |
| status: completed | |
| --- | |
| ## Summary | |
| {summary} | |
| ## Conversation | |
| {conversation}""" | |
| # Create Obsidian note | |
| obsidian_create(note_path, note_content) | |
| # Ensure daily note has sections, then append session entry | |
| ensure_daily_sections() | |
| note_name = filename.replace(".md", "") | |
| daily_line = f"- \U0001f916 **{time_str}** \u2014 [[Claude Sessions/{year}/{month}/{note_name}|{summary}]] ({msg_count} messages, {project})" | |
| obsidian_daily_append_to_section("## Claude Sessions", daily_line) | |
| # Re-index QMD | |
| qmd_update() | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment