style: Format all Python files with ruff
- Formatted 103 files to comply with ruff format requirements - No code logic changes, only formatting/whitespace - Fixes CI formatting check failures
This commit is contained in:
@@ -17,6 +17,7 @@ import time
|
||||
@dataclass
|
||||
class ChunkMetadata:
|
||||
"""Metadata for a document chunk."""
|
||||
|
||||
chunk_id: str
|
||||
source: str
|
||||
category: str
|
||||
@@ -30,6 +31,7 @@ class ChunkMetadata:
|
||||
@dataclass
|
||||
class IngestionProgress:
|
||||
"""Progress tracking for streaming ingestion."""
|
||||
|
||||
total_documents: int
|
||||
processed_documents: int
|
||||
total_chunks: int
|
||||
@@ -81,7 +83,7 @@ class StreamingIngester:
|
||||
chunk_size: int = 4000,
|
||||
chunk_overlap: int = 200,
|
||||
batch_size: int = 100,
|
||||
max_memory_mb: int = 500
|
||||
max_memory_mb: int = 500,
|
||||
):
|
||||
"""
|
||||
Initialize streaming ingester.
|
||||
@@ -103,7 +105,7 @@ class StreamingIngester:
|
||||
content: str,
|
||||
metadata: dict,
|
||||
chunk_size: int | None = None,
|
||||
chunk_overlap: int | None = None
|
||||
chunk_overlap: int | None = None,
|
||||
) -> Iterator[tuple[str, ChunkMetadata]]:
|
||||
"""
|
||||
Split document into overlapping chunks.
|
||||
@@ -130,7 +132,7 @@ class StreamingIngester:
|
||||
chunk_index=0,
|
||||
total_chunks=1,
|
||||
char_start=0,
|
||||
char_end=len(content)
|
||||
char_end=len(content),
|
||||
)
|
||||
yield content, chunk_meta
|
||||
return
|
||||
@@ -162,7 +164,7 @@ class StreamingIngester:
|
||||
chunk_index=i,
|
||||
total_chunks=total_chunks,
|
||||
char_start=start,
|
||||
char_end=end
|
||||
char_end=end,
|
||||
)
|
||||
|
||||
yield chunk_text, chunk_meta
|
||||
@@ -170,17 +172,12 @@ class StreamingIngester:
|
||||
def _generate_chunk_id(self, content: str, metadata: dict, chunk_index: int) -> str:
|
||||
"""Generate deterministic chunk ID."""
|
||||
id_string = (
|
||||
f"{metadata.get('source', '')}-"
|
||||
f"{metadata.get('file', '')}-"
|
||||
f"{chunk_index}-"
|
||||
f"{content[:50]}"
|
||||
f"{metadata.get('source', '')}-{metadata.get('file', '')}-{chunk_index}-{content[:50]}"
|
||||
)
|
||||
return hashlib.md5(id_string.encode()).hexdigest()
|
||||
|
||||
def stream_skill_directory(
|
||||
self,
|
||||
skill_dir: Path,
|
||||
callback: callable | None = None
|
||||
self, skill_dir: Path, callback: callable | None = None
|
||||
) -> Iterator[tuple[str, dict]]:
|
||||
"""
|
||||
Stream all documents from skill directory.
|
||||
@@ -218,7 +215,7 @@ class StreamingIngester:
|
||||
processed_chunks=0,
|
||||
failed_chunks=0,
|
||||
bytes_processed=0,
|
||||
start_time=time.time()
|
||||
start_time=time.time(),
|
||||
)
|
||||
|
||||
# Process each document
|
||||
@@ -235,11 +232,13 @@ class StreamingIngester:
|
||||
"category": category,
|
||||
"file": filename,
|
||||
"type": "documentation" if filename == "SKILL.md" else "reference",
|
||||
"version": "1.0.0"
|
||||
"version": "1.0.0",
|
||||
}
|
||||
|
||||
# Chunk document and yield chunks
|
||||
for chunk_count, (chunk_text, chunk_meta) in enumerate(self.chunk_document(content, metadata), start=1):
|
||||
for chunk_count, (chunk_text, chunk_meta) in enumerate(
|
||||
self.chunk_document(content, metadata), start=1
|
||||
):
|
||||
self.progress.total_chunks += 1
|
||||
|
||||
# Convert chunk metadata to dict
|
||||
@@ -272,9 +271,7 @@ class StreamingIngester:
|
||||
continue
|
||||
|
||||
def batch_iterator(
|
||||
self,
|
||||
chunks: Iterator[tuple[str, dict]],
|
||||
batch_size: int | None = None
|
||||
self, chunks: Iterator[tuple[str, dict]], batch_size: int | None = None
|
||||
) -> Iterator[list[tuple[str, dict]]]:
|
||||
"""
|
||||
Group chunks into batches for efficient processing.
|
||||
@@ -321,7 +318,7 @@ class StreamingIngester:
|
||||
"failed_chunks": self.progress.failed_chunks,
|
||||
"bytes_processed": self.progress.bytes_processed,
|
||||
},
|
||||
"state": state
|
||||
"state": state,
|
||||
}
|
||||
|
||||
checkpoint_path.write_text(json.dumps(checkpoint_data, indent=2))
|
||||
@@ -384,23 +381,25 @@ def main():
|
||||
parser = argparse.ArgumentParser(description="Stream and chunk skill documents")
|
||||
parser.add_argument("input", help="Input file or directory path")
|
||||
parser.add_argument("--chunk-size", type=int, default=4000, help="Chunk size in characters")
|
||||
parser.add_argument("--chunk-overlap", type=int, default=200, help="Chunk overlap in characters")
|
||||
parser.add_argument(
|
||||
"--chunk-overlap", type=int, default=200, help="Chunk overlap in characters"
|
||||
)
|
||||
parser.add_argument("--batch-size", type=int, default=100, help="Batch size for processing")
|
||||
parser.add_argument("--checkpoint", help="Checkpoint file path")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Initialize ingester
|
||||
ingester = StreamingIngester(
|
||||
chunk_size=args.chunk_size,
|
||||
chunk_overlap=args.chunk_overlap,
|
||||
batch_size=args.batch_size
|
||||
chunk_size=args.chunk_size, chunk_overlap=args.chunk_overlap, batch_size=args.batch_size
|
||||
)
|
||||
|
||||
# Progress callback
|
||||
def on_progress(progress: IngestionProgress):
|
||||
if progress.processed_chunks % 10 == 0:
|
||||
print(f"Progress: {progress.progress_percent:.1f}% - "
|
||||
f"{progress.processed_chunks}/{progress.total_chunks} chunks")
|
||||
print(
|
||||
f"Progress: {progress.progress_percent:.1f}% - "
|
||||
f"{progress.processed_chunks}/{progress.total_chunks} chunks"
|
||||
)
|
||||
|
||||
# Stream input
|
||||
input_path = Path(args.input)
|
||||
@@ -416,17 +415,23 @@ def main():
|
||||
metadata = {"source": input_path.stem, "file": input_path.name}
|
||||
file_chunks = ingester.chunk_document(content, metadata)
|
||||
# Convert to generator format matching stream_skill_directory
|
||||
chunks = ((text, {
|
||||
"content": text,
|
||||
"chunk_id": meta.chunk_id,
|
||||
"source": meta.source,
|
||||
"category": meta.category,
|
||||
"file": meta.file,
|
||||
"chunk_index": meta.chunk_index,
|
||||
"total_chunks": meta.total_chunks,
|
||||
"char_start": meta.char_start,
|
||||
"char_end": meta.char_end,
|
||||
}) for text, meta in file_chunks)
|
||||
chunks = (
|
||||
(
|
||||
text,
|
||||
{
|
||||
"content": text,
|
||||
"chunk_id": meta.chunk_id,
|
||||
"source": meta.source,
|
||||
"category": meta.category,
|
||||
"file": meta.file,
|
||||
"chunk_index": meta.chunk_index,
|
||||
"total_chunks": meta.total_chunks,
|
||||
"char_start": meta.char_start,
|
||||
"char_end": meta.char_end,
|
||||
},
|
||||
)
|
||||
for text, meta in file_chunks
|
||||
)
|
||||
|
||||
# Process in batches
|
||||
all_chunks = []
|
||||
@@ -437,8 +442,7 @@ def main():
|
||||
# Save checkpoint if specified
|
||||
if args.checkpoint:
|
||||
ingester.save_checkpoint(
|
||||
Path(args.checkpoint),
|
||||
{"processed_batches": len(all_chunks) // args.batch_size}
|
||||
Path(args.checkpoint), {"processed_batches": len(all_chunks) // args.batch_size}
|
||||
)
|
||||
|
||||
# Final progress
|
||||
@@ -449,4 +453,5 @@ def main():
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
sys.exit(main())
|
||||
|
||||
Reference in New Issue
Block a user