#!/usr/bin/env python3 """ Recover content from Claude Code history session files. This script extracts Write tool calls, Edit operations, and text content from Claude Code's JSONL session history files. """ import json import sys import os from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime class SessionContentRecovery: """Extract and recover content from Claude Code session files.""" def __init__(self, session_file: Path, output_dir: Optional[Path] = None): self.session_file = Path(session_file) self.output_dir = output_dir or Path.cwd() / "recovered_content" self.output_dir.mkdir(exist_ok=True) # Statistics self.stats = { "total_lines": 0, "write_calls": 0, "edit_calls": 0, "text_mentions": 0, "files_recovered": 0, } def extract_write_calls(self) -> List[Dict[str, Any]]: """Extract all Write tool calls from session.""" write_calls = [] with open(self.session_file, "r") as f: for line_num, line in enumerate(f, 1): self.stats["total_lines"] += 1 try: data = json.loads(line.strip()) # Check both direct role and nested message.role role = data.get("role") or data.get("message", {}).get("role") if role != "assistant": continue # Get content from either location content = data.get("content") or data.get("message", {}).get( "content", [] ) for item in content: if not isinstance(item, dict): continue # Look for Write tool calls if item.get("type") == "tool_use" and item.get("name") == "Write": write_input = item.get("input", {}) write_calls.append( { "line": line_num, "file_path": write_input.get("file_path", ""), "content": write_input.get("content", ""), "timestamp": data.get("timestamp", ""), } ) self.stats["write_calls"] += 1 except json.JSONDecodeError: continue except Exception as e: print(f"Warning: Error processing line {line_num}: {e}", file=sys.stderr) continue return write_calls def extract_edit_calls(self) -> List[Dict[str, Any]]: """Extract all Edit tool calls from session.""" edit_calls = [] with open(self.session_file, "r") as f: for line_num, line in enumerate(f, 1): try: data = json.loads(line.strip()) role = data.get("role") or data.get("message", {}).get("role") if role != "assistant": continue content = data.get("content") or data.get("message", {}).get( "content", [] ) for item in content: if not isinstance(item, dict): continue if item.get("type") == "tool_use" and item.get("name") == "Edit": edit_input = item.get("input", {}) edit_calls.append( { "line": line_num, "file_path": edit_input.get("file_path", ""), "old_string": edit_input.get("old_string", ""), "new_string": edit_input.get("new_string", ""), "timestamp": data.get("timestamp", ""), } ) self.stats["edit_calls"] += 1 except Exception: continue return edit_calls def save_recovered_files( self, write_calls: List[Dict[str, Any]], keywords: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """ Save recovered files to disk, preserving original directory structure. Args: write_calls: List of Write tool calls keywords: Optional keywords to filter files (matches any keyword in file path) Returns: List of saved file metadata """ saved = [] # Filter by keywords if provided if keywords: write_calls = [ call for call in write_calls if any(kw.lower() in call["file_path"].lower() for kw in keywords) ] # Deduplicate: keep latest version of each file files_by_path = {} for call in write_calls: file_path = call["file_path"] if not file_path: continue # Keep latest version (assuming chronological order in session) files_by_path[file_path] = call # Save files for file_path, call in files_by_path.items(): try: if not file_path: continue # Preserve original directory structure # Convert absolute path to relative path within output directory original_path = Path(file_path) # Handle absolute paths: extract meaningful relative path # e.g., /Users/username/project/src/file.py -> src/file.py # e.g., /home/user/workspace/project/lib/module.py -> lib/module.py path_parts = original_path.parts if len(path_parts) > 1 and path_parts[0] == "/": # For absolute paths, try to find a project-like directory # Skip leading /, Users/username, home/username patterns start_idx = 1 # Skip leading "/" if len(path_parts) > 2 and path_parts[1].lower() in ("users", "home", "user"): start_idx = 3 # Skip /Users/username or /home/user relative_parts = path_parts[start_idx:] else: relative_parts = path_parts # Construct output path preserving structure if relative_parts: output_file = self.output_dir.joinpath(*relative_parts) else: # Fallback to filename only if path is too shallow output_file = self.output_dir / original_path.name # Create parent directories output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, "w") as f: f.write(call["content"]) saved.append( { "file": output_file.name, "original_path": file_path, "size": len(call["content"]), "lines": call["content"].count("\n") + 1, "timestamp": call.get("timestamp", "unknown"), "output_path": str(output_file), } ) self.stats["files_recovered"] += 1 except Exception as e: print(f"Warning: Failed to save {file_path}: {e}", file=sys.stderr) continue return saved def generate_report(self, saved_files: List[Dict[str, Any]]) -> str: """Generate recovery report.""" report_lines = [ "=" * 60, "Claude Code Session Content Recovery Report", "=" * 60, "", f"Session file: {self.session_file}", f"Output directory: {self.output_dir}", "", "Statistics:", f" Total lines processed: {self.stats['total_lines']:,}", f" Write tool calls found: {self.stats['write_calls']}", f" Edit tool calls found: {self.stats['edit_calls']}", f" Files recovered: {self.stats['files_recovered']}", "", ] if saved_files: report_lines.extend( [ "Recovered Files:", "", ] ) for item in saved_files: report_lines.extend( [ f"✅ {item['file']}", f" Original: {item['original_path']}", f" Size: {item['size']:,} characters", f" Lines: {item['lines']:,}", f" Saved to: {item['output_path']}", "", ] ) else: report_lines.append("No files recovered (no matches or no Write calls found)") report_lines.append("") report_lines.extend(["=" * 60, ""]) return "\n".join(report_lines) def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser( description="Recover content from Claude Code session history files" ) parser.add_argument( "session_file", type=Path, help="Path to Claude Code session JSONL file", ) parser.add_argument( "-o", "--output", type=Path, help="Output directory (default: ./recovered_content)", ) parser.add_argument( "-k", "--keywords", nargs="+", help="Filter files by keywords (matches any keyword in file path)", ) parser.add_argument( "--show-edits", action="store_true", help="Also show Edit operations (not saved, just listed)", ) args = parser.parse_args() # Validate session file exists if not args.session_file.exists(): print(f"Error: Session file not found: {args.session_file}", file=sys.stderr) sys.exit(1) # Create recovery instance recovery = SessionContentRecovery(args.session_file, args.output) print(f"🔍 Analyzing session: {args.session_file}") print(f"📂 Output directory: {recovery.output_dir}\n") # Extract Write calls print("1️⃣ Extracting Write tool calls...") write_calls = recovery.extract_write_calls() print(f" Found {len(write_calls)} Write calls\n") # Save files print("2️⃣ Saving recovered files...") if args.keywords: print(f" Filtering by keywords: {', '.join(args.keywords)}") saved = recovery.save_recovered_files(write_calls, args.keywords) print(f" Saved {len(saved)} files\n") # Optionally show edits if args.show_edits: print("3️⃣ Extracting Edit tool calls...") edit_calls = recovery.extract_edit_calls() print(f" Found {len(edit_calls)} Edit calls") if edit_calls: print("\n Recent edits:") for edit in edit_calls[-5:]: # Show last 5 print(f" - {Path(edit['file_path']).name} (line {edit['line']})") print() # Generate and print report report = recovery.generate_report(saved) print(report) # Save report report_file = recovery.output_dir / "recovery_report.txt" with open(report_file, "w") as f: f.write(report) print(f"📄 Report saved to: {report_file}\n") if __name__ == "__main__": main()