Previously, recover_content.py saved all files flat in the output directory, causing files with the same name (e.g., src/utils.py and tests/utils.py) to overwrite each other. Now the script preserves the original directory structure, creating subdirectories as needed within the output directory. - Bump version: 1.0.0 → 1.0.1 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
333 lines
12 KiB
Python
Executable File
333 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
Recover content from Claude Code history session files.
|
||
|
||
This script extracts Write tool calls, Edit operations, and text content
|
||
from Claude Code's JSONL session history files.
|
||
"""
|
||
|
||
import json
|
||
import sys
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Optional
|
||
from datetime import datetime
|
||
|
||
|
||
class SessionContentRecovery:
|
||
"""Extract and recover content from Claude Code session files."""
|
||
|
||
def __init__(self, session_file: Path, output_dir: Optional[Path] = None):
|
||
self.session_file = Path(session_file)
|
||
self.output_dir = output_dir or Path.cwd() / "recovered_content"
|
||
self.output_dir.mkdir(exist_ok=True)
|
||
|
||
# Statistics
|
||
self.stats = {
|
||
"total_lines": 0,
|
||
"write_calls": 0,
|
||
"edit_calls": 0,
|
||
"text_mentions": 0,
|
||
"files_recovered": 0,
|
||
}
|
||
|
||
def extract_write_calls(self) -> List[Dict[str, Any]]:
|
||
"""Extract all Write tool calls from session."""
|
||
write_calls = []
|
||
|
||
with open(self.session_file, "r") as f:
|
||
for line_num, line in enumerate(f, 1):
|
||
self.stats["total_lines"] += 1
|
||
|
||
try:
|
||
data = json.loads(line.strip())
|
||
|
||
# Check both direct role and nested message.role
|
||
role = data.get("role") or data.get("message", {}).get("role")
|
||
if role != "assistant":
|
||
continue
|
||
|
||
# Get content from either location
|
||
content = data.get("content") or data.get("message", {}).get(
|
||
"content", []
|
||
)
|
||
|
||
for item in content:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
|
||
# Look for Write tool calls
|
||
if item.get("type") == "tool_use" and item.get("name") == "Write":
|
||
write_input = item.get("input", {})
|
||
write_calls.append(
|
||
{
|
||
"line": line_num,
|
||
"file_path": write_input.get("file_path", ""),
|
||
"content": write_input.get("content", ""),
|
||
"timestamp": data.get("timestamp", ""),
|
||
}
|
||
)
|
||
self.stats["write_calls"] += 1
|
||
|
||
except json.JSONDecodeError:
|
||
continue
|
||
except Exception as e:
|
||
print(f"Warning: Error processing line {line_num}: {e}", file=sys.stderr)
|
||
continue
|
||
|
||
return write_calls
|
||
|
||
def extract_edit_calls(self) -> List[Dict[str, Any]]:
|
||
"""Extract all Edit tool calls from session."""
|
||
edit_calls = []
|
||
|
||
with open(self.session_file, "r") as f:
|
||
for line_num, line in enumerate(f, 1):
|
||
try:
|
||
data = json.loads(line.strip())
|
||
|
||
role = data.get("role") or data.get("message", {}).get("role")
|
||
if role != "assistant":
|
||
continue
|
||
|
||
content = data.get("content") or data.get("message", {}).get(
|
||
"content", []
|
||
)
|
||
|
||
for item in content:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
|
||
if item.get("type") == "tool_use" and item.get("name") == "Edit":
|
||
edit_input = item.get("input", {})
|
||
edit_calls.append(
|
||
{
|
||
"line": line_num,
|
||
"file_path": edit_input.get("file_path", ""),
|
||
"old_string": edit_input.get("old_string", ""),
|
||
"new_string": edit_input.get("new_string", ""),
|
||
"timestamp": data.get("timestamp", ""),
|
||
}
|
||
)
|
||
self.stats["edit_calls"] += 1
|
||
|
||
except Exception:
|
||
continue
|
||
|
||
return edit_calls
|
||
|
||
def save_recovered_files(
|
||
self, write_calls: List[Dict[str, Any]], keywords: Optional[List[str]] = None
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Save recovered files to disk, preserving original directory structure.
|
||
|
||
Args:
|
||
write_calls: List of Write tool calls
|
||
keywords: Optional keywords to filter files (matches any keyword in file path)
|
||
|
||
Returns:
|
||
List of saved file metadata
|
||
"""
|
||
saved = []
|
||
|
||
# Filter by keywords if provided
|
||
if keywords:
|
||
write_calls = [
|
||
call
|
||
for call in write_calls
|
||
if any(kw.lower() in call["file_path"].lower() for kw in keywords)
|
||
]
|
||
|
||
# Deduplicate: keep latest version of each file
|
||
files_by_path = {}
|
||
for call in write_calls:
|
||
file_path = call["file_path"]
|
||
if not file_path:
|
||
continue
|
||
|
||
# Keep latest version (assuming chronological order in session)
|
||
files_by_path[file_path] = call
|
||
|
||
# Save files
|
||
for file_path, call in files_by_path.items():
|
||
try:
|
||
if not file_path:
|
||
continue
|
||
|
||
# Preserve original directory structure
|
||
# Convert absolute path to relative path within output directory
|
||
original_path = Path(file_path)
|
||
|
||
# Handle absolute paths: extract meaningful relative path
|
||
# e.g., /Users/username/project/src/file.py -> src/file.py
|
||
# e.g., /home/user/workspace/project/lib/module.py -> lib/module.py
|
||
path_parts = original_path.parts
|
||
if len(path_parts) > 1 and path_parts[0] == "/":
|
||
# For absolute paths, try to find a project-like directory
|
||
# Skip leading /, Users/username, home/username patterns
|
||
start_idx = 1 # Skip leading "/"
|
||
if len(path_parts) > 2 and path_parts[1].lower() in ("users", "home", "user"):
|
||
start_idx = 3 # Skip /Users/username or /home/user
|
||
relative_parts = path_parts[start_idx:]
|
||
else:
|
||
relative_parts = path_parts
|
||
|
||
# Construct output path preserving structure
|
||
if relative_parts:
|
||
output_file = self.output_dir.joinpath(*relative_parts)
|
||
else:
|
||
# Fallback to filename only if path is too shallow
|
||
output_file = self.output_dir / original_path.name
|
||
|
||
# Create parent directories
|
||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
with open(output_file, "w") as f:
|
||
f.write(call["content"])
|
||
|
||
saved.append(
|
||
{
|
||
"file": output_file.name,
|
||
"original_path": file_path,
|
||
"size": len(call["content"]),
|
||
"lines": call["content"].count("\n") + 1,
|
||
"timestamp": call.get("timestamp", "unknown"),
|
||
"output_path": str(output_file),
|
||
}
|
||
)
|
||
|
||
self.stats["files_recovered"] += 1
|
||
|
||
except Exception as e:
|
||
print(f"Warning: Failed to save {file_path}: {e}", file=sys.stderr)
|
||
continue
|
||
|
||
return saved
|
||
|
||
def generate_report(self, saved_files: List[Dict[str, Any]]) -> str:
|
||
"""Generate recovery report."""
|
||
report_lines = [
|
||
"=" * 60,
|
||
"Claude Code Session Content Recovery Report",
|
||
"=" * 60,
|
||
"",
|
||
f"Session file: {self.session_file}",
|
||
f"Output directory: {self.output_dir}",
|
||
"",
|
||
"Statistics:",
|
||
f" Total lines processed: {self.stats['total_lines']:,}",
|
||
f" Write tool calls found: {self.stats['write_calls']}",
|
||
f" Edit tool calls found: {self.stats['edit_calls']}",
|
||
f" Files recovered: {self.stats['files_recovered']}",
|
||
"",
|
||
]
|
||
|
||
if saved_files:
|
||
report_lines.extend(
|
||
[
|
||
"Recovered Files:",
|
||
"",
|
||
]
|
||
)
|
||
|
||
for item in saved_files:
|
||
report_lines.extend(
|
||
[
|
||
f"✅ {item['file']}",
|
||
f" Original: {item['original_path']}",
|
||
f" Size: {item['size']:,} characters",
|
||
f" Lines: {item['lines']:,}",
|
||
f" Saved to: {item['output_path']}",
|
||
"",
|
||
]
|
||
)
|
||
else:
|
||
report_lines.append("No files recovered (no matches or no Write calls found)")
|
||
report_lines.append("")
|
||
|
||
report_lines.extend(["=" * 60, ""])
|
||
|
||
return "\n".join(report_lines)
|
||
|
||
|
||
def main():
|
||
"""Main entry point."""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Recover content from Claude Code session history files"
|
||
)
|
||
parser.add_argument(
|
||
"session_file",
|
||
type=Path,
|
||
help="Path to Claude Code session JSONL file",
|
||
)
|
||
parser.add_argument(
|
||
"-o",
|
||
"--output",
|
||
type=Path,
|
||
help="Output directory (default: ./recovered_content)",
|
||
)
|
||
parser.add_argument(
|
||
"-k",
|
||
"--keywords",
|
||
nargs="+",
|
||
help="Filter files by keywords (matches any keyword in file path)",
|
||
)
|
||
parser.add_argument(
|
||
"--show-edits",
|
||
action="store_true",
|
||
help="Also show Edit operations (not saved, just listed)",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Validate session file exists
|
||
if not args.session_file.exists():
|
||
print(f"Error: Session file not found: {args.session_file}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
# Create recovery instance
|
||
recovery = SessionContentRecovery(args.session_file, args.output)
|
||
|
||
print(f"🔍 Analyzing session: {args.session_file}")
|
||
print(f"📂 Output directory: {recovery.output_dir}\n")
|
||
|
||
# Extract Write calls
|
||
print("1️⃣ Extracting Write tool calls...")
|
||
write_calls = recovery.extract_write_calls()
|
||
print(f" Found {len(write_calls)} Write calls\n")
|
||
|
||
# Save files
|
||
print("2️⃣ Saving recovered files...")
|
||
if args.keywords:
|
||
print(f" Filtering by keywords: {', '.join(args.keywords)}")
|
||
saved = recovery.save_recovered_files(write_calls, args.keywords)
|
||
print(f" Saved {len(saved)} files\n")
|
||
|
||
# Optionally show edits
|
||
if args.show_edits:
|
||
print("3️⃣ Extracting Edit tool calls...")
|
||
edit_calls = recovery.extract_edit_calls()
|
||
print(f" Found {len(edit_calls)} Edit calls")
|
||
if edit_calls:
|
||
print("\n Recent edits:")
|
||
for edit in edit_calls[-5:]: # Show last 5
|
||
print(f" - {Path(edit['file_path']).name} (line {edit['line']})")
|
||
print()
|
||
|
||
# Generate and print report
|
||
report = recovery.generate_report(saved)
|
||
print(report)
|
||
|
||
# Save report
|
||
report_file = recovery.output_dir / "recovery_report.txt"
|
||
with open(report_file, "w") as f:
|
||
f.write(report)
|
||
print(f"📄 Report saved to: {report_file}\n")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|