Replace hardcoded user paths that triggered gitleaks PII detection: - /Users/username/ → ~/ - /Users/user/ → ~/ - -Users-username- → -Users-<username>- (normalized paths) Also fix the sed example to use <home> placeholder instead of regex pattern that would match actual usernames. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
377 lines
14 KiB
Python
Executable File
377 lines
14 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Analyze Claude Code session files to find relevant sessions and statistics.
|
|
|
|
This script helps locate sessions containing specific keywords, analyze
|
|
session activity, and generate reports about session content.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from datetime import datetime
|
|
from collections import defaultdict
|
|
|
|
|
|
class SessionAnalyzer:
|
|
"""Analyze Claude Code session history files."""
|
|
|
|
def __init__(self, projects_dir: Optional[Path] = None):
|
|
"""
|
|
Initialize analyzer.
|
|
|
|
Args:
|
|
projects_dir: Path to Claude projects directory
|
|
(default: ~/.claude/projects)
|
|
"""
|
|
if projects_dir:
|
|
self.projects_dir = Path(projects_dir)
|
|
else:
|
|
self.projects_dir = Path.home() / ".claude" / "projects"
|
|
|
|
def find_project_sessions(self, project_path: str) -> List[Path]:
|
|
"""
|
|
Find all session files for a specific project.
|
|
|
|
Args:
|
|
project_path: Project path (e.g., ~/Workspace/js/myproject)
|
|
|
|
Returns:
|
|
List of session file paths
|
|
"""
|
|
# Convert project path to Claude's directory naming
|
|
# Example: ~/Workspace/js/myproject -> -Users-<username>-Workspace-js-myproject
|
|
normalized = project_path.replace("/", "-")
|
|
project_dir = self.projects_dir / normalized
|
|
|
|
if not project_dir.exists():
|
|
return []
|
|
|
|
# Find all session JSONL files (exclude agent files)
|
|
sessions = []
|
|
for file in project_dir.glob("*.jsonl"):
|
|
if not file.name.startswith("agent-"):
|
|
sessions.append(file)
|
|
|
|
return sorted(sessions, key=lambda p: p.stat().st_mtime, reverse=True)
|
|
|
|
def search_sessions(
|
|
self, sessions: List[Path], keywords: List[str], case_sensitive: bool = False
|
|
) -> Dict[Path, Dict[str, Any]]:
|
|
"""
|
|
Search sessions for keywords.
|
|
|
|
Args:
|
|
sessions: List of session file paths
|
|
keywords: Keywords to search for
|
|
case_sensitive: Whether to perform case-sensitive search
|
|
|
|
Returns:
|
|
Dict mapping session paths to match information
|
|
"""
|
|
matches = {}
|
|
|
|
for session_file in sessions:
|
|
keyword_counts = defaultdict(int)
|
|
total_mentions = 0
|
|
|
|
try:
|
|
with open(session_file, "r") as f:
|
|
for line in f:
|
|
try:
|
|
data = json.loads(line.strip())
|
|
|
|
# Extract text content from message
|
|
text_content = self._extract_text_content(data)
|
|
|
|
# Search for keywords
|
|
search_text = (
|
|
text_content if case_sensitive else text_content.lower()
|
|
)
|
|
for keyword in keywords:
|
|
search_keyword = (
|
|
keyword if case_sensitive else keyword.lower()
|
|
)
|
|
count = search_text.count(search_keyword)
|
|
if count > 0:
|
|
keyword_counts[keyword] += count
|
|
total_mentions += count
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if total_mentions > 0:
|
|
matches[session_file] = {
|
|
"total_mentions": total_mentions,
|
|
"keyword_counts": dict(keyword_counts),
|
|
"modified_time": session_file.stat().st_mtime,
|
|
"size": session_file.stat().st_size,
|
|
}
|
|
|
|
except Exception as e:
|
|
print(
|
|
f"Warning: Error processing {session_file}: {e}", file=sys.stderr
|
|
)
|
|
continue
|
|
|
|
return matches
|
|
|
|
def get_session_stats(self, session_file: Path) -> Dict[str, Any]:
|
|
"""
|
|
Get detailed statistics for a session file.
|
|
|
|
Args:
|
|
session_file: Path to session JSONL file
|
|
|
|
Returns:
|
|
Dictionary of session statistics
|
|
"""
|
|
stats = {
|
|
"total_lines": 0,
|
|
"user_messages": 0,
|
|
"assistant_messages": 0,
|
|
"tool_uses": defaultdict(int),
|
|
"write_calls": 0,
|
|
"edit_calls": 0,
|
|
"read_calls": 0,
|
|
"bash_calls": 0,
|
|
"file_operations": [],
|
|
}
|
|
|
|
try:
|
|
with open(session_file, "r") as f:
|
|
for line in f:
|
|
stats["total_lines"] += 1
|
|
|
|
try:
|
|
data = json.loads(line.strip())
|
|
|
|
# Count message types
|
|
role = data.get("role") or data.get("message", {}).get("role")
|
|
if role == "user":
|
|
stats["user_messages"] += 1
|
|
elif role == "assistant":
|
|
stats["assistant_messages"] += 1
|
|
|
|
# Analyze tool uses
|
|
content = data.get("content") or data.get("message", {}).get(
|
|
"content", []
|
|
)
|
|
for item in content:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
|
|
if item.get("type") == "tool_use":
|
|
tool_name = item.get("name", "unknown")
|
|
stats["tool_uses"][tool_name] += 1
|
|
|
|
# Track file operations
|
|
if tool_name == "Write":
|
|
stats["write_calls"] += 1
|
|
file_path = item.get("input", {}).get(
|
|
"file_path", ""
|
|
)
|
|
if file_path:
|
|
stats["file_operations"].append(
|
|
("write", file_path)
|
|
)
|
|
elif tool_name == "Edit":
|
|
stats["edit_calls"] += 1
|
|
file_path = item.get("input", {}).get(
|
|
"file_path", ""
|
|
)
|
|
if file_path:
|
|
stats["file_operations"].append(
|
|
("edit", file_path)
|
|
)
|
|
elif tool_name == "Read":
|
|
stats["read_calls"] += 1
|
|
elif tool_name == "Bash":
|
|
stats["bash_calls"] += 1
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
except Exception as e:
|
|
print(f"Error analyzing {session_file}: {e}", file=sys.stderr)
|
|
|
|
# Convert defaultdict to regular dict
|
|
stats["tool_uses"] = dict(stats["tool_uses"])
|
|
|
|
return stats
|
|
|
|
def _extract_text_content(self, data: Dict[str, Any]) -> str:
|
|
"""Extract all text content from a message."""
|
|
text_parts = []
|
|
|
|
# Get content from either location
|
|
content = data.get("content") or data.get("message", {}).get("content", [])
|
|
|
|
if isinstance(content, str):
|
|
text_parts.append(content)
|
|
elif isinstance(content, list):
|
|
for item in content:
|
|
if isinstance(item, dict):
|
|
if item.get("type") == "text":
|
|
text_parts.append(item.get("text", ""))
|
|
# Also check tool inputs for file paths etc
|
|
elif item.get("type") == "tool_use":
|
|
tool_input = item.get("input", {})
|
|
if isinstance(tool_input, dict):
|
|
# Add file paths from tool inputs
|
|
if "file_path" in tool_input:
|
|
text_parts.append(tool_input["file_path"])
|
|
# Add content from Write calls
|
|
if "content" in tool_input:
|
|
text_parts.append(tool_input["content"])
|
|
|
|
return " ".join(text_parts)
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Analyze Claude Code session history files"
|
|
)
|
|
|
|
subparsers = parser.add_subparsers(dest="command", help="Command to run")
|
|
|
|
# List sessions command
|
|
list_parser = subparsers.add_parser("list", help="List all sessions for a project")
|
|
list_parser.add_argument("project_path", help="Project path")
|
|
list_parser.add_argument(
|
|
"--limit", type=int, default=10, help="Max sessions to show (default: 10)"
|
|
)
|
|
|
|
# Search command
|
|
search_parser = subparsers.add_parser("search", help="Search sessions for keywords")
|
|
search_parser.add_argument("project_path", help="Project path")
|
|
search_parser.add_argument(
|
|
"keywords", nargs="+", help="Keywords to search for"
|
|
)
|
|
search_parser.add_argument(
|
|
"--case-sensitive", action="store_true", help="Case-sensitive search"
|
|
)
|
|
|
|
# Stats command
|
|
stats_parser = subparsers.add_parser("stats", help="Get session statistics")
|
|
stats_parser.add_argument("session_file", type=Path, help="Session file path")
|
|
stats_parser.add_argument(
|
|
"--show-files", action="store_true", help="Show file operations"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
analyzer = SessionAnalyzer()
|
|
|
|
if args.command == "list":
|
|
sessions = analyzer.find_project_sessions(args.project_path)
|
|
if not sessions:
|
|
print(f"No sessions found for project: {args.project_path}")
|
|
sys.exit(1)
|
|
|
|
print(f"Found {len(sessions)} session(s) for {args.project_path}\n")
|
|
print(f"Showing {min(args.limit, len(sessions))} most recent:\n")
|
|
|
|
for i, session in enumerate(sessions[: args.limit], 1):
|
|
mtime = datetime.fromtimestamp(session.stat().st_mtime)
|
|
size_kb = session.stat().st_size / 1024
|
|
print(f"{i}. {session.name}")
|
|
print(f" Modified: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f" Size: {size_kb:.1f} KB")
|
|
print(f" Path: {session}")
|
|
print()
|
|
|
|
elif args.command == "search":
|
|
sessions = analyzer.find_project_sessions(args.project_path)
|
|
if not sessions:
|
|
print(f"No sessions found for project: {args.project_path}")
|
|
sys.exit(1)
|
|
|
|
print(f"Searching {len(sessions)} session(s) for: {', '.join(args.keywords)}\n")
|
|
|
|
matches = analyzer.search_sessions(
|
|
sessions, args.keywords, args.case_sensitive
|
|
)
|
|
|
|
if not matches:
|
|
print("No matches found.")
|
|
sys.exit(0)
|
|
|
|
# Sort by total mentions
|
|
sorted_matches = sorted(
|
|
matches.items(), key=lambda x: x[1]["total_mentions"], reverse=True
|
|
)
|
|
|
|
print(f"Found {len(matches)} session(s) with matches:\n")
|
|
|
|
for session, info in sorted_matches:
|
|
mtime = datetime.fromtimestamp(info["modified_time"])
|
|
print(f"📄 {session.name}")
|
|
print(f" Date: {mtime.strftime('%Y-%m-%d %H:%M')}")
|
|
print(f" Total mentions: {info['total_mentions']}")
|
|
print(f" Keywords: {', '.join(f'{k}({v})' for k, v in info['keyword_counts'].items())}")
|
|
print(f" Path: {session}")
|
|
print()
|
|
|
|
elif args.command == "stats":
|
|
if not args.session_file.exists():
|
|
print(f"Error: Session file not found: {args.session_file}")
|
|
sys.exit(1)
|
|
|
|
print(f"Analyzing session: {args.session_file}\n")
|
|
|
|
stats = analyzer.get_session_stats(args.session_file)
|
|
|
|
print("=" * 60)
|
|
print("Session Statistics")
|
|
print("=" * 60)
|
|
print(f"\nMessages:")
|
|
print(f" Total lines: {stats['total_lines']:,}")
|
|
print(f" User messages: {stats['user_messages']}")
|
|
print(f" Assistant messages: {stats['assistant_messages']}")
|
|
|
|
print(f"\nTool Usage:")
|
|
print(f" Write calls: {stats['write_calls']}")
|
|
print(f" Edit calls: {stats['edit_calls']}")
|
|
print(f" Read calls: {stats['read_calls']}")
|
|
print(f" Bash calls: {stats['bash_calls']}")
|
|
|
|
if stats["tool_uses"]:
|
|
print(f"\n All tools:")
|
|
for tool, count in sorted(
|
|
stats["tool_uses"].items(), key=lambda x: x[1], reverse=True
|
|
):
|
|
print(f" {tool}: {count}")
|
|
|
|
if args.show_files and stats["file_operations"]:
|
|
print(f"\nFile Operations ({len(stats['file_operations'])}):")
|
|
# Group by file
|
|
files = defaultdict(list)
|
|
for op, path in stats["file_operations"]:
|
|
files[path].append(op)
|
|
|
|
# Limit to 20 files to prevent terminal flooding on large sessions
|
|
for file_path, ops in list(files.items())[:20]:
|
|
filename = Path(file_path).name
|
|
op_summary = ", ".join(
|
|
f"{op}({ops.count(op)})" for op in set(ops)
|
|
)
|
|
print(f" {filename}")
|
|
print(f" Operations: {op_summary}")
|
|
print(f" Path: {file_path}")
|
|
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|