#!/usr/bin/env python3 """ Analyze Claude Code session files to find relevant sessions and statistics. This script helps locate sessions containing specific keywords, analyze session activity, and generate reports about session content. """ import json import os import sys from pathlib import Path from typing import Dict, List, Any, Optional from datetime import datetime from collections import defaultdict class SessionAnalyzer: """Analyze Claude Code session history files.""" def __init__(self, projects_dir: Optional[Path] = None): """ Initialize analyzer. Args: projects_dir: Path to Claude projects directory (default: ~/.claude/projects) """ if projects_dir: self.projects_dir = Path(projects_dir) else: self.projects_dir = Path.home() / ".claude" / "projects" def find_project_sessions(self, project_path: str) -> List[Path]: """ Find all session files for a specific project. Args: project_path: Project path (e.g., /Users/user/Workspace/js/myproject) Returns: List of session file paths """ # Convert project path to Claude's directory naming # Example: /Users/user/Workspace/js/myproject -> -Users-user-Workspace-js-myproject normalized = project_path.replace("/", "-") project_dir = self.projects_dir / normalized if not project_dir.exists(): return [] # Find all session JSONL files (exclude agent files) sessions = [] for file in project_dir.glob("*.jsonl"): if not file.name.startswith("agent-"): sessions.append(file) return sorted(sessions, key=lambda p: p.stat().st_mtime, reverse=True) def search_sessions( self, sessions: List[Path], keywords: List[str], case_sensitive: bool = False ) -> Dict[Path, Dict[str, Any]]: """ Search sessions for keywords. Args: sessions: List of session file paths keywords: Keywords to search for case_sensitive: Whether to perform case-sensitive search Returns: Dict mapping session paths to match information """ matches = {} for session_file in sessions: keyword_counts = defaultdict(int) total_mentions = 0 try: with open(session_file, "r") as f: for line in f: try: data = json.loads(line.strip()) # Extract text content from message text_content = self._extract_text_content(data) # Search for keywords search_text = ( text_content if case_sensitive else text_content.lower() ) for keyword in keywords: search_keyword = ( keyword if case_sensitive else keyword.lower() ) count = search_text.count(search_keyword) if count > 0: keyword_counts[keyword] += count total_mentions += count except json.JSONDecodeError: continue if total_mentions > 0: matches[session_file] = { "total_mentions": total_mentions, "keyword_counts": dict(keyword_counts), "modified_time": session_file.stat().st_mtime, "size": session_file.stat().st_size, } except Exception as e: print( f"Warning: Error processing {session_file}: {e}", file=sys.stderr ) continue return matches def get_session_stats(self, session_file: Path) -> Dict[str, Any]: """ Get detailed statistics for a session file. Args: session_file: Path to session JSONL file Returns: Dictionary of session statistics """ stats = { "total_lines": 0, "user_messages": 0, "assistant_messages": 0, "tool_uses": defaultdict(int), "write_calls": 0, "edit_calls": 0, "read_calls": 0, "bash_calls": 0, "file_operations": [], } try: with open(session_file, "r") as f: for line in f: stats["total_lines"] += 1 try: data = json.loads(line.strip()) # Count message types role = data.get("role") or data.get("message", {}).get("role") if role == "user": stats["user_messages"] += 1 elif role == "assistant": stats["assistant_messages"] += 1 # Analyze tool uses content = data.get("content") or data.get("message", {}).get( "content", [] ) for item in content: if not isinstance(item, dict): continue if item.get("type") == "tool_use": tool_name = item.get("name", "unknown") stats["tool_uses"][tool_name] += 1 # Track file operations if tool_name == "Write": stats["write_calls"] += 1 file_path = item.get("input", {}).get( "file_path", "" ) if file_path: stats["file_operations"].append( ("write", file_path) ) elif tool_name == "Edit": stats["edit_calls"] += 1 file_path = item.get("input", {}).get( "file_path", "" ) if file_path: stats["file_operations"].append( ("edit", file_path) ) elif tool_name == "Read": stats["read_calls"] += 1 elif tool_name == "Bash": stats["bash_calls"] += 1 except json.JSONDecodeError: continue except Exception as e: print(f"Error analyzing {session_file}: {e}", file=sys.stderr) # Convert defaultdict to regular dict stats["tool_uses"] = dict(stats["tool_uses"]) return stats def _extract_text_content(self, data: Dict[str, Any]) -> str: """Extract all text content from a message.""" text_parts = [] # Get content from either location content = data.get("content") or data.get("message", {}).get("content", []) if isinstance(content, str): text_parts.append(content) elif isinstance(content, list): for item in content: if isinstance(item, dict): if item.get("type") == "text": text_parts.append(item.get("text", "")) # Also check tool inputs for file paths etc elif item.get("type") == "tool_use": tool_input = item.get("input", {}) if isinstance(tool_input, dict): # Add file paths from tool inputs if "file_path" in tool_input: text_parts.append(tool_input["file_path"]) # Add content from Write calls if "content" in tool_input: text_parts.append(tool_input["content"]) return " ".join(text_parts) def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser( description="Analyze Claude Code session history files" ) subparsers = parser.add_subparsers(dest="command", help="Command to run") # List sessions command list_parser = subparsers.add_parser("list", help="List all sessions for a project") list_parser.add_argument("project_path", help="Project path") list_parser.add_argument( "--limit", type=int, default=10, help="Max sessions to show (default: 10)" ) # Search command search_parser = subparsers.add_parser("search", help="Search sessions for keywords") search_parser.add_argument("project_path", help="Project path") search_parser.add_argument( "keywords", nargs="+", help="Keywords to search for" ) search_parser.add_argument( "--case-sensitive", action="store_true", help="Case-sensitive search" ) # Stats command stats_parser = subparsers.add_parser("stats", help="Get session statistics") stats_parser.add_argument("session_file", type=Path, help="Session file path") stats_parser.add_argument( "--show-files", action="store_true", help="Show file operations" ) args = parser.parse_args() if not args.command: parser.print_help() sys.exit(1) analyzer = SessionAnalyzer() if args.command == "list": sessions = analyzer.find_project_sessions(args.project_path) if not sessions: print(f"No sessions found for project: {args.project_path}") sys.exit(1) print(f"Found {len(sessions)} session(s) for {args.project_path}\n") print(f"Showing {min(args.limit, len(sessions))} most recent:\n") for i, session in enumerate(sessions[: args.limit], 1): mtime = datetime.fromtimestamp(session.stat().st_mtime) size_kb = session.stat().st_size / 1024 print(f"{i}. {session.name}") print(f" Modified: {mtime.strftime('%Y-%m-%d %H:%M:%S')}") print(f" Size: {size_kb:.1f} KB") print(f" Path: {session}") print() elif args.command == "search": sessions = analyzer.find_project_sessions(args.project_path) if not sessions: print(f"No sessions found for project: {args.project_path}") sys.exit(1) print(f"Searching {len(sessions)} session(s) for: {', '.join(args.keywords)}\n") matches = analyzer.search_sessions( sessions, args.keywords, args.case_sensitive ) if not matches: print("No matches found.") sys.exit(0) # Sort by total mentions sorted_matches = sorted( matches.items(), key=lambda x: x[1]["total_mentions"], reverse=True ) print(f"Found {len(matches)} session(s) with matches:\n") for session, info in sorted_matches: mtime = datetime.fromtimestamp(info["modified_time"]) print(f"📄 {session.name}") print(f" Date: {mtime.strftime('%Y-%m-%d %H:%M')}") print(f" Total mentions: {info['total_mentions']}") print(f" Keywords: {', '.join(f'{k}({v})' for k, v in info['keyword_counts'].items())}") print(f" Path: {session}") print() elif args.command == "stats": if not args.session_file.exists(): print(f"Error: Session file not found: {args.session_file}") sys.exit(1) print(f"Analyzing session: {args.session_file}\n") stats = analyzer.get_session_stats(args.session_file) print("=" * 60) print("Session Statistics") print("=" * 60) print(f"\nMessages:") print(f" Total lines: {stats['total_lines']:,}") print(f" User messages: {stats['user_messages']}") print(f" Assistant messages: {stats['assistant_messages']}") print(f"\nTool Usage:") print(f" Write calls: {stats['write_calls']}") print(f" Edit calls: {stats['edit_calls']}") print(f" Read calls: {stats['read_calls']}") print(f" Bash calls: {stats['bash_calls']}") if stats["tool_uses"]: print(f"\n All tools:") for tool, count in sorted( stats["tool_uses"].items(), key=lambda x: x[1], reverse=True ): print(f" {tool}: {count}") if args.show_files and stats["file_operations"]: print(f"\nFile Operations ({len(stats['file_operations'])}):") # Group by file files = defaultdict(list) for op, path in stats["file_operations"]: files[path].append(op) # Limit to 20 files to prevent terminal flooding on large sessions for file_path, ops in list(files.items())[:20]: filename = Path(file_path).name op_summary = ", ".join( f"{op}({ops.count(op)})" for op in set(ops) ) print(f" {filename}") print(f" Operations: {op_summary}") print(f" Path: {file_path}") print() if __name__ == "__main__": main()