Files
claude-code-skills-reference/continue-claude-work/scripts/extract_resume_context.py
daymade 2192458ef7 release: add scrapling-skill and fix script compatibility
- add scrapling-skill with validated CLI workflow, diagnostics, packaging, and docs integration
- fix skill-creator package_skill.py so direct script invocation works from repo root
- fix continue-claude-work extract_resume_context.py typing compatibility for local python3
- bump marketplace to 1.39.0 and updated skill versions
2026-03-18 23:08:55 +08:00

842 lines
31 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Extract actionable resume context from Claude Code session files.
Produces a structured Markdown briefing by fusing:
- Session index metadata (sessions-index.json)
- Compact boundary summaries (highest-signal context)
- Post-compact user/assistant messages (the "hot zone")
- Subagent workflow state (multi-agent recovery)
- Session end reason detection
- Git workspace state
- MEMORY.md persistent context
- Interrupted tool-call detection
Usage:
# Extract context from latest session for current project
python3 extract_resume_context.py
# Extract context from a specific session
python3 extract_resume_context.py --session <SESSION_ID>
# Search sessions by topic
python3 extract_resume_context.py --query "auth feature"
# List recent sessions
python3 extract_resume_context.py --list
# Specify project path explicitly
python3 extract_resume_context.py --project /path/to/project
"""
import argparse
import json
import os
import re
import subprocess
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional
CLAUDE_DIR = Path.home() / ".claude"
PROJECTS_DIR = CLAUDE_DIR / "projects"
# Message types that are noise — skip when extracting context
NOISE_TYPES = {"progress", "queue-operation", "file-history-snapshot", "last-prompt"}
# System message subtypes that are noise
NOISE_SUBTYPES = {"api_error", "turn_duration", "stop_hook_summary"}
# Patterns that indicate system/internal content, not real user requests
NOISE_USER_PATTERNS = [
"This session is being continued",
"<task-notification>",
"<system-reminder>",
]
def normalize_path(project_path: str) -> str:
"""Convert absolute path to Claude's normalized directory name."""
return project_path.replace("/", "-")
def find_project_dir(project_path: str) -> Optional[Path]:
"""Find the Claude projects directory for a given project path."""
abs_path = os.path.abspath(project_path)
# If the path is already inside ~/.claude/projects/, use it directly
projects_str = str(PROJECTS_DIR) + "/"
if abs_path.startswith(projects_str):
candidate = Path(abs_path)
if candidate.is_dir():
return candidate
rel = abs_path[len(projects_str):]
top_dir = PROJECTS_DIR / rel.split("/")[0]
if top_dir.is_dir():
return top_dir
normalized = normalize_path(abs_path)
candidate = PROJECTS_DIR / normalized
if candidate.is_dir():
return candidate
# Fallback: search for partial match
for d in PROJECTS_DIR.iterdir():
if d.is_dir() and normalized in d.name:
return d
return None
def load_sessions_index(project_dir: Path) -> List[Dict]:
"""Load and parse sessions-index.json, sorted by modified desc."""
index_file = project_dir / "sessions-index.json"
if not index_file.exists():
return []
with open(index_file, encoding="utf-8") as f:
data = json.load(f)
entries = data.get("entries", [])
entries.sort(key=lambda e: e.get("modified", ""), reverse=True)
return entries
def search_sessions(entries: List[Dict], query: str) -> List[Dict]:
"""Search sessions by keyword in firstPrompt and summary."""
query_lower = query.lower()
results = []
for entry in entries:
first_prompt = (entry.get("firstPrompt") or "").lower()
summary = (entry.get("summary") or "").lower()
if query_lower in first_prompt or query_lower in summary:
results.append(entry)
return results
def format_session_entry(entry: Dict, file_exists: bool = True) -> str:
"""Format a session index entry for display."""
sid = entry.get("sessionId", "?")
modified = entry.get("modified", "?")
msgs = entry.get("messageCount", "?")
branch = entry.get("gitBranch", "?")
prompt = (entry.get("firstPrompt") or "")[:80]
ghost = "" if file_exists else " [file missing]"
return f" {sid} [{branch}] {msgs} msgs {modified}{ghost}\n {prompt}"
# ── Session file parsing ────────────────────────────────────────────
def parse_session_structure(session_file: Path) -> Dict:
"""Parse a session JSONL file and return structured data."""
file_size = session_file.stat().st_size
total_lines = 0
# First pass: find compact boundaries and count lines
compact_boundaries = [] # (line_num, summary_text)
with open(session_file, encoding="utf-8") as f:
prev_boundary_line = None
for i, raw_line in enumerate(f):
total_lines += 1
# Detect compact summary via isCompactSummary flag (most reliable)
if '"isCompactSummary"' in raw_line:
try:
obj = json.loads(raw_line)
if obj.get("isCompactSummary"):
content = obj.get("message", {}).get("content", "")
if isinstance(content, str):
boundary_line = prev_boundary_line if prev_boundary_line is not None else max(0, i - 1)
compact_boundaries.append((boundary_line, content))
prev_boundary_line = None
continue
except json.JSONDecodeError:
pass
# Detect compact_boundary marker
if '"compact_boundary"' in raw_line and '"subtype"' in raw_line:
try:
obj = json.loads(raw_line)
if obj.get("subtype") == "compact_boundary":
prev_boundary_line = i
continue
except json.JSONDecodeError:
pass
# Fallback: if prev line was boundary and this is a user message with long string content
if prev_boundary_line is not None:
try:
obj = json.loads(raw_line)
content = obj.get("message", {}).get("content", "")
if isinstance(content, str) and len(content) > 100:
compact_boundaries.append((prev_boundary_line, content))
except (json.JSONDecodeError, AttributeError):
compact_boundaries.append((prev_boundary_line, ""))
prev_boundary_line = None
# Determine hot zone: everything after last compact boundary + its summary
if compact_boundaries:
last_boundary_line = compact_boundaries[-1][0]
hot_zone_start = last_boundary_line + 2 # skip boundary + summary
else:
# No compact boundaries: use size-adaptive strategy
if file_size < 500_000: # <500KB: read last 60%
hot_zone_start = max(0, int(total_lines * 0.4))
elif file_size < 5_000_000: # <5MB: read last 30%
hot_zone_start = max(0, int(total_lines * 0.7))
else: # >5MB: read last 15%
hot_zone_start = max(0, int(total_lines * 0.85))
# Second pass: extract hot zone messages
messages = []
unresolved_tool_calls = {} # tool_use_id -> tool_use_info
errors = []
files_touched = set()
last_message_role = None
error_count = 0
with open(session_file, encoding="utf-8") as f:
for i, raw_line in enumerate(f):
if i < hot_zone_start:
continue
try:
obj = json.loads(raw_line)
except json.JSONDecodeError:
continue
msg_type = obj.get("type", "")
if msg_type in NOISE_TYPES:
continue
if msg_type == "system":
subtype = obj.get("subtype", "")
if subtype in NOISE_SUBTYPES:
if subtype == "api_error":
error_count += 1
continue
if subtype == "compact_boundary":
continue
# Track tool calls and results
msg = obj.get("message", {})
role = msg.get("role", "")
content = msg.get("content", "")
# Extract tool_use from assistant messages
if role == "assistant" and isinstance(content, list):
for block in content:
if not isinstance(block, dict) or block.get("type") != "tool_use":
continue
tool_id = block.get("id", "")
tool_name = block.get("name", "?")
inp = block.get("input", {})
unresolved_tool_calls[tool_id] = {
"name": tool_name,
"input_preview": str(inp)[:200],
}
# Track file operations
if tool_name in ("Write", "Edit", "Read"):
fp = inp.get("file_path", "")
if fp:
files_touched.add(fp)
elif tool_name == "Bash":
cmd = inp.get("command", "")
for match in re.findall(r'(?<!\w)(/[a-zA-Z][\w./\-]+)', cmd):
if not match.startswith("/dev/"):
files_touched.add(match)
# Resolve tool results
if role == "user" and isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
tool_id = block.get("tool_use_id", "")
unresolved_tool_calls.pop(tool_id, None)
is_error = block.get("is_error", False)
result_content = block.get("content", "")
if is_error and isinstance(result_content, str):
errors.append(result_content[:500])
# Track last message for end-reason detection
if role in ("user", "assistant"):
last_message_role = role
messages.append(obj)
# Detect session end reason
end_reason = _detect_end_reason(
last_message_role, unresolved_tool_calls, error_count,
)
return {
"total_lines": total_lines,
"file_size": file_size,
"compact_boundaries": compact_boundaries,
"hot_zone_start": hot_zone_start,
"messages": messages,
"unresolved_tool_calls": dict(unresolved_tool_calls),
"errors": errors,
"error_count": error_count,
"files_touched": files_touched,
"end_reason": end_reason,
}
def _detect_end_reason(
last_role: Optional[str],
unresolved: Dict,
error_count: int,
) -> str:
"""Detect why the session ended."""
if unresolved:
return "interrupted" # Tool calls dispatched but no results — likely ctrl-c
if error_count >= 3:
return "error_cascade" # Multiple API errors suggest systemic failure
if last_role == "assistant":
return "completed" # Assistant had the last word — clean end
if last_role == "user":
return "abandoned" # User sent a message but got no response
return "unknown"
def _is_noise_user_text(text: str) -> bool:
"""Check if user text is system noise rather than a real request."""
for pattern in NOISE_USER_PATTERNS:
if text.startswith(pattern) or pattern in text[:200]:
return True
return False
def extract_user_text(messages: List[Dict], limit: int = 5) -> List[str]:
"""Extract the last N user text messages (not tool results or system noise)."""
user_texts = []
for msg_obj in reversed(messages):
if msg_obj.get("isCompactSummary"):
continue
msg = msg_obj.get("message", {})
if msg.get("role") != "user":
continue
content = msg.get("content", "")
if isinstance(content, str) and content.strip():
if _is_noise_user_text(content):
continue
user_texts.append(content.strip())
elif isinstance(content, list):
texts = [
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
]
combined = "\n".join(t for t in texts if t.strip())
if combined and not _is_noise_user_text(combined):
user_texts.append(combined)
if len(user_texts) >= limit:
break
user_texts.reverse()
return user_texts
def extract_assistant_text(messages: List[Dict], limit: int = 3) -> List[str]:
"""Extract the last N assistant text responses (no thinking/tool_use)."""
assistant_texts = []
for msg_obj in reversed(messages):
msg = msg_obj.get("message", {})
if msg.get("role") != "assistant":
continue
content = msg.get("content", "")
if isinstance(content, list):
texts = [
b.get("text", "")
for b in content
if isinstance(b, dict) and b.get("type") == "text"
]
combined = "\n".join(t for t in texts if t.strip())
if combined:
assistant_texts.append(combined[:2000])
if len(assistant_texts) >= limit:
break
assistant_texts.reverse()
return assistant_texts
# ── Subagent extraction ──────────────────────────────────────────────
def extract_subagent_context(session_file: Path) -> List[Dict]:
"""Extract subagent summaries from session subdirectories.
Returns list of {name, type, status, last_text, is_interrupted}.
"""
session_dir = session_file.parent / session_file.stem
subagents_dir = session_dir / "subagents"
if not subagents_dir.is_dir():
return []
# Group by agent ID: find meta.json and .jsonl pairs
agent_ids = set()
for f in subagents_dir.iterdir():
if f.suffix == ".jsonl":
agent_ids.add(f.stem)
results = []
for agent_id in sorted(agent_ids):
jsonl_file = subagents_dir / f"{agent_id}.jsonl"
meta_file = subagents_dir / f"{agent_id}.meta.json"
# Parse agent type from ID (format: agent-a<type>-<hash> or agent-a<hash>)
agent_type = "unknown"
if meta_file.exists():
try:
with open(meta_file, encoding="utf-8") as f:
meta = json.load(f)
agent_type = meta.get("type", meta.get("subagent_type", "unknown"))
except (json.JSONDecodeError, OSError):
pass
if agent_type == "unknown":
# Infer from ID pattern: agent-a<type>-<hash>
match = re.match(r'agent-a(compact|prompt_suggestion|[a-z_]+)-', agent_id)
if match:
agent_type = match.group(1)
# Skip compact and prompt_suggestion agents (internal, not user work)
if agent_type in ("compact", "prompt_suggestion"):
continue
# Read last few lines for final output
last_text = ""
is_interrupted = False
line_count = 0
if jsonl_file.exists():
try:
lines = jsonl_file.read_text(encoding="utf-8").strip().split("\n")
line_count = len(lines)
has_tool_use_pending = False
# Check last 10 lines for final assistant text
for raw_line in reversed(lines[-10:]):
try:
obj = json.loads(raw_line)
msg = obj.get("message", {})
role = msg.get("role", "")
content = msg.get("content", "")
if role == "assistant" and isinstance(content, list):
for block in content:
if not isinstance(block, dict):
continue
block_type = block.get("type")
if block_type == "tool_use":
has_tool_use_pending = True
elif block_type == "text":
text = block.get("text", "")
if text.strip() and not last_text:
last_text = text.strip()[:500]
if role == "user" and isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
has_tool_use_pending = False
except json.JSONDecodeError:
continue
is_interrupted = has_tool_use_pending
except OSError:
pass
results.append({
"id": agent_id,
"type": agent_type,
"last_text": last_text,
"is_interrupted": is_interrupted,
"lines": line_count,
})
return results
# ── Context sources ──────────────────────────────────────────────────
def get_git_state(project_path: str) -> str:
"""Get current git status and recent log."""
parts = []
try:
branch = subprocess.run(
["git", "branch", "--show-current"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, cwd=project_path, timeout=5,
)
if branch.stdout.strip():
parts.append(f"**Current branch**: `{branch.stdout.strip()}`")
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
try:
status = subprocess.run(
["git", "status", "--short"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, cwd=project_path, timeout=10,
)
if status.stdout.strip():
parts.append(f"### git status\n```\n{status.stdout.strip()}\n```")
else:
parts.append("### git status\nClean working tree.")
except (subprocess.TimeoutExpired, FileNotFoundError):
parts.append("### git status\n(unavailable)")
try:
log = subprocess.run(
["git", "log", "--oneline", "-5"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, cwd=project_path, timeout=10,
)
if log.stdout.strip():
parts.append(f"### git log (last 5)\n```\n{log.stdout.strip()}\n```")
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return "\n\n".join(parts)
def get_memory_md(project_dir: Path) -> Optional[str]:
"""Read MEMORY.md if it exists in the project's memory directory."""
memory_dir = project_dir / "memory"
memory_file = memory_dir / "MEMORY.md"
if memory_file.exists():
content = memory_file.read_text(encoding="utf-8").strip()
if content:
return content[:3000]
return None
def get_session_memory(session_file: Path) -> Optional[str]:
"""Read session-memory/summary.md if it exists (newer CC versions)."""
session_dir = session_file.parent / session_file.stem
summary = session_dir / "session-memory" / "summary.md"
if summary.exists():
content = summary.read_text(encoding="utf-8").strip()
if content:
return content[:3000]
return None
# ── Output formatting ────────────────────────────────────────────────
END_REASON_LABELS = {
"completed": "Clean exit (assistant completed response)",
"interrupted": "Interrupted (unresolved tool calls — likely ctrl-c or timeout)",
"error_cascade": "Error cascade (multiple API errors)",
"abandoned": "Abandoned (user message with no response)",
"unknown": "Unknown",
}
def build_briefing(
session_entry: Optional[Dict],
parsed: Dict,
project_path: str,
project_dir: Path,
session_file: Path,
) -> str:
"""Build the structured Markdown briefing."""
sections = []
# Header
sections.append("# Resume Context Briefing\n")
# Session metadata
if session_entry:
sid = session_entry.get("sessionId", "?")
modified = session_entry.get("modified", "?")
branch = session_entry.get("gitBranch", "?")
msg_count = session_entry.get("messageCount", "?")
first_prompt = session_entry.get("firstPrompt", "")
summary = session_entry.get("summary", "")
sections.append("## Session Info\n")
sections.append(f"- **ID**: `{sid}`")
sections.append(f"- **Last active**: {modified}")
sections.append(f"- **Branch**: `{branch}`")
sections.append(f"- **Messages**: {msg_count}")
sections.append(f"- **First prompt**: {first_prompt}")
if summary:
sections.append(f"- **Summary**: {summary[:300]}")
# File stats + end reason
file_mb = parsed["file_size"] / 1_000_000
end_label = END_REASON_LABELS.get(parsed["end_reason"], parsed["end_reason"])
sections.append(f"\n**Session file**: {file_mb:.1f} MB, {parsed['total_lines']} lines, "
f"{len(parsed['compact_boundaries'])} compaction(s)")
sections.append(f"**Session end reason**: {end_label}")
if parsed["error_count"] > 0:
sections.append(f"**API errors**: {parsed['error_count']}")
# Session memory (newer CC versions generate this automatically)
session_mem = get_session_memory(session_file)
if session_mem:
sections.append("\n## Session Memory (auto-generated by Claude Code)\n")
sections.append(session_mem)
# Compact summary (highest-signal context)
if parsed["compact_boundaries"]:
last_summary = parsed["compact_boundaries"][-1][1]
if last_summary:
# Allow up to 8K chars — this is the highest-signal content
display = last_summary[:8000]
if len(last_summary) > 8000:
display += f"\n\n... (truncated, full summary: {len(last_summary)} chars)"
sections.append("\n## Compact Summary (auto-generated by previous session)\n")
sections.append(display)
# Last user requests
user_texts = extract_user_text(parsed["messages"])
if user_texts:
sections.append("\n## Last User Requests\n")
for i, text in enumerate(user_texts, 1):
display = text[:500]
if len(text) > 500:
display += "..."
sections.append(f"### Request {i}\n{display}\n")
# Last assistant responses
assistant_texts = extract_assistant_text(parsed["messages"])
if assistant_texts:
sections.append("\n## Last Assistant Responses\n")
for i, text in enumerate(assistant_texts, 1):
display = text[:1000]
if len(text) > 1000:
display += "..."
sections.append(f"### Response {i}\n{display}\n")
# Errors encountered
if parsed["errors"]:
sections.append("\n## Errors Encountered\n")
seen = set()
for err in parsed["errors"]:
short = err[:200]
if short not in seen:
seen.add(short)
sections.append(f"```\n{err}\n```\n")
# Unresolved tool calls (interrupted session)
if parsed["unresolved_tool_calls"]:
sections.append("\n## Unresolved Tool Calls (session was interrupted)\n")
for tool_id, info in parsed["unresolved_tool_calls"].items():
sections.append(f"- **{info['name']}**: `{tool_id}`")
sections.append(f" Input: {info['input_preview']}")
# Subagent context (the "nobody has done this" feature)
subagents = extract_subagent_context(session_file)
if subagents:
interrupted = [s for s in subagents if s["is_interrupted"]]
completed = [s for s in subagents if not s["is_interrupted"]]
sections.append(f"\n## Subagent Workflow ({len(completed)} completed, {len(interrupted)} interrupted)\n")
if interrupted:
sections.append("### Interrupted Subagents\n")
for sa in interrupted:
sections.append(f"- **{sa['type']}** (`{sa['id']}`, {sa['lines']} lines)")
if sa["last_text"]:
sections.append(f" Last output: {sa['last_text'][:300]}")
if completed:
sections.append("\n### Completed Subagents\n")
for sa in completed:
sections.append(f"- **{sa['type']}** (`{sa['id']}`, {sa['lines']} lines)")
if sa["last_text"]:
sections.append(f" Last output: {sa['last_text'][:200]}")
# Files touched in session
if parsed["files_touched"]:
sections.append("\n## Files Touched in Session\n")
for fp in sorted(parsed["files_touched"])[:30]:
sections.append(f"- `{fp}`")
# MEMORY.md
memory = get_memory_md(project_dir)
if memory:
sections.append("\n## Persistent Memory (MEMORY.md)\n")
sections.append(memory)
# Git state
sections.append("\n## Current Workspace State\n")
sections.append(get_git_state(project_path))
return "\n".join(sections)
# ── CLI ──────────────────────────────────────────────────────────────
def _check_session_files(entries: List[Dict], project_dir: Path) -> Dict[str, bool]:
"""Check which index entries have actual files on disk."""
status = {}
for entry in entries:
sid = entry.get("sessionId", "")
session_file = project_dir / f"{sid}.jsonl"
if session_file.exists():
status[sid] = True
else:
full_path = entry.get("fullPath", "")
status[sid] = bool(full_path and Path(full_path).exists())
return status
def main():
parser = argparse.ArgumentParser(
description="Extract actionable resume context from Claude Code sessions.",
)
parser.add_argument(
"--project", "-p",
default=os.getcwd(),
help="Project path (default: current directory)",
)
parser.add_argument(
"--session", "-s",
default=None,
help="Session ID to extract context from",
)
parser.add_argument(
"--query", "-q",
default=None,
help="Search sessions by keyword in firstPrompt/summary",
)
parser.add_argument(
"--list", "-l",
action="store_true",
help="List recent sessions",
)
parser.add_argument(
"--limit", "-n",
type=int,
default=10,
help="Number of sessions to list (default: 10)",
)
parser.add_argument(
"--exclude-current",
default=None,
help="Session ID to exclude (typically the currently active session)",
)
args = parser.parse_args()
project_path = os.path.abspath(args.project)
project_dir = find_project_dir(project_path)
if not project_dir:
print(f"Error: no Claude session data found for {project_path}", file=sys.stderr)
print(f"Looked in: {PROJECTS_DIR / normalize_path(project_path)}", file=sys.stderr)
sys.exit(1)
entries = load_sessions_index(project_dir)
# ── List mode ──
if args.list:
# Show both index entries and actual files, with file-exists status
file_status = _check_session_files(entries, project_dir)
existing = sum(1 for v in file_status.values() if v)
missing = sum(1 for v in file_status.values() if not v)
if not entries and not list(project_dir.glob("*.jsonl")):
print("No sessions found.")
sys.exit(0)
print(f"Sessions for {project_path}:\n")
if missing > 0:
print(f" (index has {len(entries)} entries, {existing} with files, {missing} with missing files)\n")
for entry in entries[:args.limit]:
sid = entry.get("sessionId", "")
exists = file_status.get(sid, False)
print(format_session_entry(entry, file_exists=exists))
print()
# Also show files NOT in index
indexed_ids = {e.get("sessionId") for e in entries}
orphan_files = [
f for f in sorted(project_dir.glob("*.jsonl"), key=lambda f: f.stat().st_mtime, reverse=True)
if f.stem not in indexed_ids
]
if orphan_files:
print(f" Files not in index ({len(orphan_files)}):")
for f in orphan_files[:5]:
size_kb = f.stat().st_size / 1000
print(f" {f.stem} ({size_kb:.0f} KB)")
print()
sys.exit(0)
# ── Query mode ──
if args.query:
results = search_sessions(entries, args.query)
if not results:
print(f"No sessions matching '{args.query}'.", file=sys.stderr)
sys.exit(1)
print(f"Sessions matching '{args.query}' ({len(results)} found):\n")
for entry in results[: args.limit]:
print(format_session_entry(entry))
print()
if len(results) == 1:
args.session = results[0]["sessionId"]
else:
sys.exit(0)
# ── Extract mode ──
session_id = args.session
session_entry = None
if session_id:
for entry in entries:
if entry.get("sessionId") == session_id:
session_entry = entry
break
session_file = project_dir / f"{session_id}.jsonl"
if not session_file.exists():
if session_entry:
full_path = session_entry.get("fullPath", "")
if full_path and Path(full_path).exists():
session_file = Path(full_path)
if not session_file.exists():
for jsonl in project_dir.glob("*.jsonl"):
if session_id in jsonl.name:
session_file = jsonl
break
else:
print(f"Error: session file not found for {session_id}", file=sys.stderr)
sys.exit(1)
else:
# Use latest session — prefer actual files, skip current session
jsonl_files = sorted(
project_dir.glob("*.jsonl"),
key=lambda f: f.stat().st_mtime,
reverse=True,
)
if args.exclude_current:
jsonl_files = [f for f in jsonl_files if f.stem != args.exclude_current]
if not jsonl_files:
print("Error: no session files found.", file=sys.stderr)
sys.exit(1)
# Skip the most recent file if it's likely the current active session
# (modified within the last 60 seconds and no explicit session requested)
if len(jsonl_files) > 1:
newest = jsonl_files[0]
age_seconds = time.time() - newest.stat().st_mtime
if age_seconds < 60:
print(f"Skipping active session {newest.stem} (modified {age_seconds:.0f}s ago)",
file=sys.stderr)
jsonl_files = jsonl_files[1:]
session_file = jsonl_files[0]
session_id = session_file.stem
for entry in entries:
if entry.get("sessionId") == session_id:
session_entry = entry
break
# Parse and build briefing
print(f"Parsing session {session_id} ({session_file.stat().st_size / 1_000_000:.1f} MB)...",
file=sys.stderr)
parsed = parse_session_structure(session_file)
briefing = build_briefing(session_entry, parsed, project_path, project_dir, session_file)
print(briefing)
if __name__ == "__main__":
main()