Files
antigravity-skills-reference/skills/context-agent/scripts/session_parser.py
ProgramadorBrasil 61ec71c5c7 feat: add 52 specialized AI agent skills (#217)
New skills covering 10 categories:

**Security & Audit**: 007 (STRIDE/PASTA/OWASP), cred-omega (secrets management)
**AI Personas**: Karpathy, Hinton, Sutskever, LeCun (4 sub-skills), Altman, Musk, Gates, Jobs, Buffett
**Multi-agent Orchestration**: agent-orchestrator, task-intelligence, multi-advisor
**Code Analysis**: matematico-tao (Terence Tao-inspired mathematical code analysis)
**Social & Messaging**: Instagram Graph API, Telegram Bot, WhatsApp Cloud API, social-orchestrator
**Image Generation**: AI Studio (Gemini), Stability AI, ComfyUI Gateway, image-studio router
**Brazilian Domain**: 6 auction specialist modules, 2 legal advisors, auctioneers data scraper
**Product & Growth**: design, invention, monetization, analytics, growth engine
**DevOps & LLM Ops**: Docker/CI-CD/AWS, RAG/embeddings/fine-tuning
**Skill Governance**: installer, sentinel auditor, context management

Each skill includes:
- Standardized YAML frontmatter (name, description, risk, source, tags, tools)
- Structured sections (Overview, When to Use, How it Works, Best Practices)
- Python scripts and reference documentation where applicable
- Cross-platform compatibility (Claude Code, Antigravity, Cursor, Gemini CLI, Codex CLI)

Co-authored-by: ProgramadorBrasil <214873561+ProgramadorBrasil@users.noreply.github.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 10:04:07 +01:00

207 lines
6.9 KiB
Python

"""
Parser dos logs JSONL do Claude Code.
Lê arquivos de sessão e extrai informações estruturadas.
"""
import json
from pathlib import Path
from datetime import datetime
from typing import Optional
from config import CLAUDE_SESSION_DIR, FILE_MODIFYING_TOOLS
from models import SessionEntry
def parse_session_file(path: Path) -> list[SessionEntry]:
"""Lê um arquivo JSONL e retorna lista de SessionEntry."""
entries = []
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
raw = json.loads(line)
entry = _parse_raw_entry(raw)
if entry:
entries.append(entry)
except json.JSONDecodeError:
continue
return entries
def _parse_raw_entry(raw: dict) -> Optional[SessionEntry]:
"""Converte um dict JSON bruto em SessionEntry."""
entry_type = raw.get("type", "")
if entry_type == "queue-operation":
return SessionEntry(
type="queue",
timestamp=raw.get("timestamp", ""),
session_id=raw.get("sessionId", ""),
content=raw.get("content", ""),
)
if entry_type not in ("user", "assistant"):
return None
msg = raw.get("message", {})
role = msg.get("role", "")
slug = raw.get("slug", "")
session_id = raw.get("sessionId", "")
timestamp = raw.get("timestamp", "")
# Extrair texto e tool_calls do content
text_parts = []
tool_calls = []
files_modified = []
model = msg.get("model", "")
content = msg.get("content", "")
if isinstance(content, str):
text_parts.append(content)
elif isinstance(content, list):
for block in content:
if not isinstance(block, dict):
continue
block_type = block.get("type", "")
if block_type == "text":
text_parts.append(block.get("text", ""))
elif block_type == "tool_use":
tool_name = block.get("name", "")
tool_input = block.get("input", {})
tool_calls.append({"name": tool_name, "input": tool_input})
# Detectar arquivos modificados
if tool_name in FILE_MODIFYING_TOOLS:
fp = tool_input.get("file_path", "")
if fp:
files_modified.append({"path": fp, "action": tool_name.lower()})
elif block_type == "tool_result":
# Resultados de ferramentas (em mensagens do user)
result_content = block.get("content", "")
if isinstance(result_content, list):
for rc in result_content:
if isinstance(rc, dict) and rc.get("type") == "text":
text_parts.append(rc.get("text", ""))
elif isinstance(result_content, str):
text_parts.append(result_content)
# Token usage
usage = msg.get("usage", {})
token_usage = {}
if usage:
token_usage = {
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0),
"cache_read": usage.get("cache_read_input_tokens", 0),
"cache_creation": usage.get("cache_creation_input_tokens", 0),
}
return SessionEntry(
type=entry_type,
timestamp=timestamp,
session_id=session_id,
slug=slug,
role=role,
content="\n".join(text_parts),
tool_calls=tool_calls,
token_usage=token_usage,
model=model,
files_modified=files_modified,
)
def extract_user_messages(entries: list[SessionEntry]) -> list[str]:
"""Extrai apenas o texto das mensagens do usuário."""
return [e.content for e in entries if e.role == "user" and e.content.strip()]
def extract_assistant_messages(entries: list[SessionEntry]) -> list[str]:
"""Extrai apenas o texto das respostas do assistente."""
return [e.content for e in entries if e.role == "assistant" and e.content.strip()]
def extract_tool_calls(entries: list[SessionEntry]) -> list[dict]:
"""Extrai todas as chamadas de ferramentas."""
calls = []
for e in entries:
calls.extend(e.tool_calls)
return calls
def extract_files_modified(entries: list[SessionEntry]) -> list[dict]:
"""Extrai lista de arquivos modificados (sem duplicatas)."""
seen = set()
files = []
for e in entries:
for f in e.files_modified:
key = f["path"]
if key not in seen:
seen.add(key)
files.append(f)
return files
def get_session_metadata(entries: list[SessionEntry]) -> dict:
"""Extrai metadados da sessão: slug, timestamps, modelo, tokens."""
if not entries:
return {}
timestamps = [e.timestamp for e in entries if e.timestamp]
slugs = [e.slug for e in entries if e.slug]
models = [e.model for e in entries if e.model]
total_input = sum(e.token_usage.get("input", 0) for e in entries)
total_output = sum(e.token_usage.get("output", 0) for e in entries)
total_cache = sum(e.token_usage.get("cache_read", 0) for e in entries)
user_msgs = [e for e in entries if e.role == "user"]
assistant_msgs = [e for e in entries if e.role == "assistant"]
# Calcular duração
duration_minutes = 0
if len(timestamps) >= 2:
try:
t_start = datetime.fromisoformat(timestamps[0].replace("Z", "+00:00"))
t_end = datetime.fromisoformat(timestamps[-1].replace("Z", "+00:00"))
duration_minutes = int((t_end - t_start).total_seconds() / 60)
except (ValueError, IndexError):
pass
return {
"slug": slugs[0] if slugs else "",
"session_id": entries[0].session_id if entries else "",
"start_time": timestamps[0] if timestamps else "",
"end_time": timestamps[-1] if timestamps else "",
"duration_minutes": duration_minutes,
"model": models[0] if models else "",
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"total_cache_tokens": total_cache,
"message_count": len(user_msgs) + len(assistant_msgs),
"tool_call_count": sum(len(e.tool_calls) for e in entries),
}
def get_latest_session_file() -> Optional[Path]:
"""Encontra o arquivo JSONL mais recente."""
if not CLAUDE_SESSION_DIR.exists():
return None
jsonl_files = sorted(
CLAUDE_SESSION_DIR.glob("*.jsonl"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
return jsonl_files[0] if jsonl_files else None
def get_all_session_files() -> list[Path]:
"""Retorna todos os arquivos JSONL ordenados por data de modificação."""
if not CLAUDE_SESSION_DIR.exists():
return []
return sorted(
CLAUDE_SESSION_DIR.glob("*.jsonl"),
key=lambda p: p.stat().st_mtime,
reverse=True,
)