Files
daymade 22de8f043c Fix claude-code-history-files-finder: preserve directory structure on recovery
Previously, recover_content.py saved all files flat in the output directory,
causing files with the same name (e.g., src/utils.py and tests/utils.py) to
overwrite each other.

Now the script preserves the original directory structure, creating subdirectories
as needed within the output directory.

- Bump version: 1.0.0 → 1.0.1

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-05 13:32:54 +08:00

333 lines
12 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Recover content from Claude Code history session files.
This script extracts Write tool calls, Edit operations, and text content
from Claude Code's JSONL session history files.
"""
import json
import sys
import os
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
class SessionContentRecovery:
"""Extract and recover content from Claude Code session files."""
def __init__(self, session_file: Path, output_dir: Optional[Path] = None):
self.session_file = Path(session_file)
self.output_dir = output_dir or Path.cwd() / "recovered_content"
self.output_dir.mkdir(exist_ok=True)
# Statistics
self.stats = {
"total_lines": 0,
"write_calls": 0,
"edit_calls": 0,
"text_mentions": 0,
"files_recovered": 0,
}
def extract_write_calls(self) -> List[Dict[str, Any]]:
"""Extract all Write tool calls from session."""
write_calls = []
with open(self.session_file, "r") as f:
for line_num, line in enumerate(f, 1):
self.stats["total_lines"] += 1
try:
data = json.loads(line.strip())
# Check both direct role and nested message.role
role = data.get("role") or data.get("message", {}).get("role")
if role != "assistant":
continue
# Get content from either location
content = data.get("content") or data.get("message", {}).get(
"content", []
)
for item in content:
if not isinstance(item, dict):
continue
# Look for Write tool calls
if item.get("type") == "tool_use" and item.get("name") == "Write":
write_input = item.get("input", {})
write_calls.append(
{
"line": line_num,
"file_path": write_input.get("file_path", ""),
"content": write_input.get("content", ""),
"timestamp": data.get("timestamp", ""),
}
)
self.stats["write_calls"] += 1
except json.JSONDecodeError:
continue
except Exception as e:
print(f"Warning: Error processing line {line_num}: {e}", file=sys.stderr)
continue
return write_calls
def extract_edit_calls(self) -> List[Dict[str, Any]]:
"""Extract all Edit tool calls from session."""
edit_calls = []
with open(self.session_file, "r") as f:
for line_num, line in enumerate(f, 1):
try:
data = json.loads(line.strip())
role = data.get("role") or data.get("message", {}).get("role")
if role != "assistant":
continue
content = data.get("content") or data.get("message", {}).get(
"content", []
)
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "tool_use" and item.get("name") == "Edit":
edit_input = item.get("input", {})
edit_calls.append(
{
"line": line_num,
"file_path": edit_input.get("file_path", ""),
"old_string": edit_input.get("old_string", ""),
"new_string": edit_input.get("new_string", ""),
"timestamp": data.get("timestamp", ""),
}
)
self.stats["edit_calls"] += 1
except Exception:
continue
return edit_calls
def save_recovered_files(
self, write_calls: List[Dict[str, Any]], keywords: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Save recovered files to disk, preserving original directory structure.
Args:
write_calls: List of Write tool calls
keywords: Optional keywords to filter files (matches any keyword in file path)
Returns:
List of saved file metadata
"""
saved = []
# Filter by keywords if provided
if keywords:
write_calls = [
call
for call in write_calls
if any(kw.lower() in call["file_path"].lower() for kw in keywords)
]
# Deduplicate: keep latest version of each file
files_by_path = {}
for call in write_calls:
file_path = call["file_path"]
if not file_path:
continue
# Keep latest version (assuming chronological order in session)
files_by_path[file_path] = call
# Save files
for file_path, call in files_by_path.items():
try:
if not file_path:
continue
# Preserve original directory structure
# Convert absolute path to relative path within output directory
original_path = Path(file_path)
# Handle absolute paths: extract meaningful relative path
# e.g., /Users/username/project/src/file.py -> src/file.py
# e.g., /home/user/workspace/project/lib/module.py -> lib/module.py
path_parts = original_path.parts
if len(path_parts) > 1 and path_parts[0] == "/":
# For absolute paths, try to find a project-like directory
# Skip leading /, Users/username, home/username patterns
start_idx = 1 # Skip leading "/"
if len(path_parts) > 2 and path_parts[1].lower() in ("users", "home", "user"):
start_idx = 3 # Skip /Users/username or /home/user
relative_parts = path_parts[start_idx:]
else:
relative_parts = path_parts
# Construct output path preserving structure
if relative_parts:
output_file = self.output_dir.joinpath(*relative_parts)
else:
# Fallback to filename only if path is too shallow
output_file = self.output_dir / original_path.name
# Create parent directories
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, "w") as f:
f.write(call["content"])
saved.append(
{
"file": output_file.name,
"original_path": file_path,
"size": len(call["content"]),
"lines": call["content"].count("\n") + 1,
"timestamp": call.get("timestamp", "unknown"),
"output_path": str(output_file),
}
)
self.stats["files_recovered"] += 1
except Exception as e:
print(f"Warning: Failed to save {file_path}: {e}", file=sys.stderr)
continue
return saved
def generate_report(self, saved_files: List[Dict[str, Any]]) -> str:
"""Generate recovery report."""
report_lines = [
"=" * 60,
"Claude Code Session Content Recovery Report",
"=" * 60,
"",
f"Session file: {self.session_file}",
f"Output directory: {self.output_dir}",
"",
"Statistics:",
f" Total lines processed: {self.stats['total_lines']:,}",
f" Write tool calls found: {self.stats['write_calls']}",
f" Edit tool calls found: {self.stats['edit_calls']}",
f" Files recovered: {self.stats['files_recovered']}",
"",
]
if saved_files:
report_lines.extend(
[
"Recovered Files:",
"",
]
)
for item in saved_files:
report_lines.extend(
[
f"{item['file']}",
f" Original: {item['original_path']}",
f" Size: {item['size']:,} characters",
f" Lines: {item['lines']:,}",
f" Saved to: {item['output_path']}",
"",
]
)
else:
report_lines.append("No files recovered (no matches or no Write calls found)")
report_lines.append("")
report_lines.extend(["=" * 60, ""])
return "\n".join(report_lines)
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Recover content from Claude Code session history files"
)
parser.add_argument(
"session_file",
type=Path,
help="Path to Claude Code session JSONL file",
)
parser.add_argument(
"-o",
"--output",
type=Path,
help="Output directory (default: ./recovered_content)",
)
parser.add_argument(
"-k",
"--keywords",
nargs="+",
help="Filter files by keywords (matches any keyword in file path)",
)
parser.add_argument(
"--show-edits",
action="store_true",
help="Also show Edit operations (not saved, just listed)",
)
args = parser.parse_args()
# Validate session file exists
if not args.session_file.exists():
print(f"Error: Session file not found: {args.session_file}", file=sys.stderr)
sys.exit(1)
# Create recovery instance
recovery = SessionContentRecovery(args.session_file, args.output)
print(f"🔍 Analyzing session: {args.session_file}")
print(f"📂 Output directory: {recovery.output_dir}\n")
# Extract Write calls
print("1⃣ Extracting Write tool calls...")
write_calls = recovery.extract_write_calls()
print(f" Found {len(write_calls)} Write calls\n")
# Save files
print("2⃣ Saving recovered files...")
if args.keywords:
print(f" Filtering by keywords: {', '.join(args.keywords)}")
saved = recovery.save_recovered_files(write_calls, args.keywords)
print(f" Saved {len(saved)} files\n")
# Optionally show edits
if args.show_edits:
print("3⃣ Extracting Edit tool calls...")
edit_calls = recovery.extract_edit_calls()
print(f" Found {len(edit_calls)} Edit calls")
if edit_calls:
print("\n Recent edits:")
for edit in edit_calls[-5:]: # Show last 5
print(f" - {Path(edit['file_path']).name} (line {edit['line']})")
print()
# Generate and print report
report = recovery.generate_report(saved)
print(report)
# Save report
report_file = recovery.output_dir / "recovery_report.txt"
with open(report_file, "w") as f:
f.write(report)
print(f"📄 Report saved to: {report_file}\n")
if __name__ == "__main__":
main()