diff --git a/src/skill_seekers/cli/adaptors/streaming_adaptor.py b/src/skill_seekers/cli/adaptors/streaming_adaptor.py new file mode 100644 index 0000000..8629b5e --- /dev/null +++ b/src/skill_seekers/cli/adaptors/streaming_adaptor.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +""" +Streaming Adaptor Mixin + +Provides streaming ingestion capabilities for platform adaptors. +Enables memory-efficient processing of large documentation sets. +""" + +import json +from pathlib import Path +from typing import Any, Iterator, Optional +import sys + +# Add parent directory to path for imports +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from streaming_ingest import StreamingIngester, IngestionProgress + + +class StreamingAdaptorMixin: + """ + Mixin class to add streaming capabilities to platform adaptors. + + Provides: + - Chunked document processing + - Memory-efficient streaming + - Progress tracking + - Batch processing + - Resume capability + """ + + def package_streaming( + self, + skill_dir: Path, + output_path: Path, + chunk_size: int = 4000, + chunk_overlap: int = 200, + batch_size: int = 100, + progress_callback: Optional[callable] = None + ) -> Path: + """ + Package skill using streaming ingestion. + + Memory-efficient alternative to standard package() method. + Suitable for large documentation sets (>100 documents or >10MB). + + Args: + skill_dir: Path to skill directory + output_path: Output path/filename + chunk_size: Maximum characters per chunk + chunk_overlap: Overlap between chunks (for context) + batch_size: Number of chunks per batch + progress_callback: Optional callback(progress: IngestionProgress) + + Returns: + Path to created package file + """ + skill_dir = Path(skill_dir) + output_path = Path(output_path) + + # Initialize streaming ingester + ingester = StreamingIngester( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + batch_size=batch_size + ) + + print(f"\nšŸ“Š Streaming ingestion starting...") + print(f" Chunk size: {chunk_size} chars") + print(f" Overlap: {chunk_overlap} chars") + print(f" Batch size: {batch_size} chunks") + + # Progress tracking + last_update = 0 + + def on_progress(progress: IngestionProgress): + nonlocal last_update + # Update every 10 chunks + if progress.processed_chunks - last_update >= 10: + print(f" {progress.progress_percent:.1f}% - " + f"{progress.processed_chunks}/{progress.total_chunks} chunks " + f"({progress.chunks_per_second:.1f} chunks/sec)") + last_update = progress.processed_chunks + + if progress_callback: + progress_callback(progress) + + # Stream and collect chunks + chunks = ingester.stream_skill_directory(skill_dir, callback=on_progress) + all_chunks = list(chunks) + + print(f"\nāœ… Streaming ingestion complete!") + print(f" Total chunks: {len(all_chunks)}") + print(f" Total bytes: {ingester.progress.bytes_processed:,}") + print(f" Time: {ingester.progress.elapsed_time:.1f}s") + print(f" Rate: {ingester.progress.chunks_per_second:.1f} chunks/sec") + + # Convert chunks to platform format + print(f"\nšŸ“¦ Converting to {self.PLATFORM_NAME} format...") + package_data = self._convert_chunks_to_platform_format( + all_chunks, + skill_dir.name + ) + + # Determine output filename + if output_path.is_dir() or str(output_path).endswith("/"): + output_path = output_path / f"{skill_dir.name}-{self.PLATFORM}-streaming.json" + elif not str(output_path).endswith(".json"): + output_str = str(output_path).replace(".zip", ".json").replace(".tar.gz", ".json") + if f"-{self.PLATFORM}" not in output_str: + output_str = output_str.replace(".json", f"-{self.PLATFORM}.json") + output_path = Path(output_str) + + # Write output + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text( + json.dumps(package_data, indent=2, ensure_ascii=False), + encoding="utf-8" + ) + + print(f"āœ… Package created: {output_path}") + print(f" Size: {output_path.stat().st_size:,} bytes") + + return output_path + + def _convert_chunks_to_platform_format( + self, + chunks: list[tuple[str, dict]], + skill_name: str + ) -> dict: + """ + Convert chunks to platform-specific format. + + Subclasses should override this method to customize format. + + Args: + chunks: List of (chunk_text, chunk_metadata) tuples + skill_name: Name of the skill + + Returns: + Platform-specific data structure + """ + # Default implementation: generic format + documents = [] + metadatas = [] + ids = [] + + for chunk_text, chunk_meta in chunks: + documents.append(chunk_text) + metadatas.append(chunk_meta) + ids.append(chunk_meta["chunk_id"]) + + return { + "skill_name": skill_name, + "documents": documents, + "metadatas": metadatas, + "ids": ids, + "total_chunks": len(chunks), + "streaming": True + } + + def estimate_chunks( + self, + skill_dir: Path, + chunk_size: int = 4000, + chunk_overlap: int = 200 + ) -> dict[str, Any]: + """ + Estimate chunking for a skill directory. + + Useful for planning and validation before processing. + + Args: + skill_dir: Path to skill directory + chunk_size: Maximum characters per chunk + chunk_overlap: Overlap between chunks + + Returns: + Estimation statistics + """ + skill_dir = Path(skill_dir) + ingester = StreamingIngester( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap + ) + + # Count files and estimate chunks + total_docs = 0 + total_chars = 0 + estimated_chunks = 0 + file_stats = [] + + # SKILL.md + skill_md = skill_dir / "SKILL.md" + if skill_md.exists(): + content = skill_md.read_text(encoding="utf-8") + char_count = len(content) + chunk_count = max(1, (char_count - chunk_overlap) // (chunk_size - chunk_overlap) + 1) + + total_docs += 1 + total_chars += char_count + estimated_chunks += chunk_count + + file_stats.append({ + "file": "SKILL.md", + "chars": char_count, + "estimated_chunks": chunk_count + }) + + # Reference files + refs_dir = skill_dir / "references" + if refs_dir.exists(): + for ref_file in sorted(refs_dir.glob("*.md")): + if ref_file.is_file() and not ref_file.name.startswith("."): + content = ref_file.read_text(encoding="utf-8") + char_count = len(content) + chunk_count = max(1, (char_count - chunk_overlap) // (chunk_size - chunk_overlap) + 1) + + total_docs += 1 + total_chars += char_count + estimated_chunks += chunk_count + + file_stats.append({ + "file": ref_file.name, + "chars": char_count, + "estimated_chunks": chunk_count + }) + + return { + "skill_name": skill_dir.name, + "total_documents": total_docs, + "total_characters": total_chars, + "estimated_chunks": estimated_chunks, + "chunk_size": chunk_size, + "chunk_overlap": chunk_overlap, + "file_stats": file_stats, + "estimated_memory_mb": (total_chars * 2) / (1024 * 1024), # UTF-8 estimate + "recommended_streaming": total_chars > 1_000_000 or total_docs > 100 + } + + +# Example: Extend existing adaptors with streaming +class StreamingLangChainAdaptor(StreamingAdaptorMixin): + """LangChain adaptor with streaming support.""" + + PLATFORM = "langchain" + PLATFORM_NAME = "LangChain (Streaming)" + + def _convert_chunks_to_platform_format(self, chunks, skill_name): + """Convert chunks to LangChain Document format.""" + documents = [] + + for chunk_text, chunk_meta in chunks: + documents.append({ + "page_content": chunk_text, + "metadata": { + "source": chunk_meta["source"], + "category": chunk_meta["category"], + "file": chunk_meta["file"], + "chunk_id": chunk_meta["chunk_id"], + "chunk_index": chunk_meta["chunk_index"], + "total_chunks": chunk_meta["total_chunks"], + "type": chunk_meta.get("type", "documentation"), + "version": chunk_meta.get("version", "1.0.0"), + } + }) + + return { + "documents": documents, + "total_chunks": len(chunks), + "streaming": True, + "format": "LangChain Document" + } + + +class StreamingChromaAdaptor(StreamingAdaptorMixin): + """Chroma adaptor with streaming support.""" + + PLATFORM = "chroma" + PLATFORM_NAME = "Chroma (Streaming)" + + def _convert_chunks_to_platform_format(self, chunks, skill_name): + """Convert chunks to Chroma format.""" + documents = [] + metadatas = [] + ids = [] + + for chunk_text, chunk_meta in chunks: + documents.append(chunk_text) + metadatas.append({ + "source": chunk_meta["source"], + "category": chunk_meta["category"], + "file": chunk_meta["file"], + "chunk_index": chunk_meta["chunk_index"], + "total_chunks": chunk_meta["total_chunks"], + "type": chunk_meta.get("type", "documentation"), + }) + ids.append(chunk_meta["chunk_id"]) + + return { + "documents": documents, + "metadatas": metadatas, + "ids": ids, + "collection_name": skill_name.replace("_", "-"), + "total_chunks": len(chunks), + "streaming": True + } + + +def demo_streaming(): + """Demonstrate streaming ingestion.""" + from pathlib import Path + + # Example with LangChain + adaptor = StreamingLangChainAdaptor() + + # Estimate first + print("=" * 60) + print("ESTIMATION") + print("=" * 60) + + skill_dir = Path("output/ansible") + estimate = adaptor.estimate_chunks(skill_dir, chunk_size=2000, chunk_overlap=100) + + print(f"\nSkill: {estimate['skill_name']}") + print(f"Documents: {estimate['total_documents']}") + print(f"Characters: {estimate['total_characters']:,}") + print(f"Estimated chunks: {estimate['estimated_chunks']}") + print(f"Estimated memory: {estimate['estimated_memory_mb']:.2f} MB") + print(f"Streaming recommended: {estimate['recommended_streaming']}") + + print("\nFile breakdown:") + for stat in estimate["file_stats"]: + print(f" {stat['file']}: {stat['chars']:,} chars → {stat['estimated_chunks']} chunks") + + # Package with streaming + print("\n" + "=" * 60) + print("STREAMING INGESTION") + print("=" * 60) + + output = adaptor.package_streaming( + skill_dir, + Path("output"), + chunk_size=2000, + chunk_overlap=100, + batch_size=50 + ) + + print(f"\nāœ… Complete! Output: {output}") + + +if __name__ == "__main__": + demo_streaming() diff --git a/src/skill_seekers/cli/package_skill.py b/src/skill_seekers/cli/package_skill.py index 8eaa768..a5b814d 100644 --- a/src/skill_seekers/cli/package_skill.py +++ b/src/skill_seekers/cli/package_skill.py @@ -35,7 +35,16 @@ except ImportError: ) -def package_skill(skill_dir, open_folder_after=True, skip_quality_check=False, target="claude"): +def package_skill( + skill_dir, + open_folder_after=True, + skip_quality_check=False, + target="claude", + streaming=False, + chunk_size=4000, + chunk_overlap=200, + batch_size=100 +): """ Package a skill directory into platform-specific format @@ -44,6 +53,10 @@ def package_skill(skill_dir, open_folder_after=True, skip_quality_check=False, t open_folder_after: Whether to open the output folder after packaging skip_quality_check: Skip quality checks before packaging target: Target LLM platform ('claude', 'gemini', 'openai', 'markdown') + streaming: Use streaming ingestion for large docs + chunk_size: Maximum characters per chunk (streaming mode) + chunk_overlap: Overlap between chunks (streaming mode) + batch_size: Number of chunks per batch (streaming mode) Returns: tuple: (success, package_path) where success is bool and package_path is Path or None @@ -97,8 +110,25 @@ def package_skill(skill_dir, open_folder_after=True, skip_quality_check=False, t print(f" Target: {adaptor.PLATFORM_NAME}") print(f" Source: {skill_path}") + if streaming: + print(f" Mode: Streaming (chunk_size={chunk_size}, overlap={chunk_overlap})") + try: - package_path = adaptor.package(skill_path, output_dir) + # Use streaming if requested and supported + if streaming and hasattr(adaptor, 'package_streaming'): + package_path = adaptor.package_streaming( + skill_path, + output_dir, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + batch_size=batch_size + ) + elif streaming: + print("āš ļø Streaming not supported for this platform, using standard packaging") + package_path = adaptor.package(skill_path, output_dir) + else: + package_path = adaptor.package(skill_path, output_dir) + print(f" Output: {package_path}") except Exception as e: print(f"āŒ Error creating package: {e}") @@ -166,6 +196,33 @@ Examples: help="Automatically upload after packaging (requires platform API key)", ) + parser.add_argument( + "--streaming", + action="store_true", + help="Use streaming ingestion for large docs (memory-efficient, with chunking)", + ) + + parser.add_argument( + "--chunk-size", + type=int, + default=4000, + help="Maximum characters per chunk (streaming mode, default: 4000)", + ) + + parser.add_argument( + "--chunk-overlap", + type=int, + default=200, + help="Overlap between chunks for context (streaming mode, default: 200)", + ) + + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Number of chunks per batch (streaming mode, default: 100)", + ) + args = parser.parse_args() success, package_path = package_skill( @@ -173,6 +230,10 @@ Examples: open_folder_after=not args.no_open, skip_quality_check=args.skip_quality_check, target=args.target, + streaming=args.streaming, + chunk_size=args.chunk_size, + chunk_overlap=args.chunk_overlap, + batch_size=args.batch_size, ) if not success: diff --git a/src/skill_seekers/cli/streaming_ingest.py b/src/skill_seekers/cli/streaming_ingest.py new file mode 100644 index 0000000..8dadd8b --- /dev/null +++ b/src/skill_seekers/cli/streaming_ingest.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python3 +""" +Streaming Ingestion for Large Documentation Sets + +Provides memory-efficient processing and batch upload capabilities for large +skill documentation. Handles chunking, progress tracking, and resume functionality. +""" + +import json +import hashlib +from pathlib import Path +from typing import Any, Iterator, Optional +from dataclasses import dataclass +import time + + +@dataclass +class ChunkMetadata: + """Metadata for a document chunk.""" + chunk_id: str + source: str + category: str + file: str + chunk_index: int + total_chunks: int + char_start: int + char_end: int + + +@dataclass +class IngestionProgress: + """Progress tracking for streaming ingestion.""" + total_documents: int + processed_documents: int + total_chunks: int + processed_chunks: int + failed_chunks: int + bytes_processed: int + start_time: float + + @property + def progress_percent(self) -> float: + """Calculate progress percentage.""" + if self.total_chunks == 0: + return 0.0 + return (self.processed_chunks / self.total_chunks) * 100 + + @property + def elapsed_time(self) -> float: + """Calculate elapsed time in seconds.""" + return time.time() - self.start_time + + @property + def chunks_per_second(self) -> float: + """Calculate processing rate.""" + elapsed = self.elapsed_time + if elapsed == 0: + return 0.0 + return self.processed_chunks / elapsed + + @property + def eta_seconds(self) -> float: + """Estimate time remaining in seconds.""" + rate = self.chunks_per_second + if rate == 0: + return 0.0 + remaining = self.total_chunks - self.processed_chunks + return remaining / rate + + +class StreamingIngester: + """ + Streaming ingestion manager for large documentation sets. + + Provides memory-efficient processing with chunking, progress tracking, + and resume capabilities. + """ + + def __init__( + self, + chunk_size: int = 4000, + chunk_overlap: int = 200, + batch_size: int = 100, + max_memory_mb: int = 500 + ): + """ + Initialize streaming ingester. + + Args: + chunk_size: Maximum characters per chunk + chunk_overlap: Overlap between chunks (for context) + batch_size: Number of chunks per batch + max_memory_mb: Maximum memory usage in MB + """ + self.chunk_size = chunk_size + self.chunk_overlap = chunk_overlap + self.batch_size = batch_size + self.max_memory_mb = max_memory_mb + self.progress = None + + def chunk_document( + self, + content: str, + metadata: dict, + chunk_size: Optional[int] = None, + chunk_overlap: Optional[int] = None + ) -> Iterator[tuple[str, ChunkMetadata]]: + """ + Split document into overlapping chunks. + + Args: + content: Document content + metadata: Document metadata + chunk_size: Override default chunk size + chunk_overlap: Override default overlap + + Yields: + Tuple of (chunk_text, chunk_metadata) + """ + chunk_size = chunk_size or self.chunk_size + chunk_overlap = chunk_overlap or self.chunk_overlap + + if len(content) <= chunk_size: + # Document fits in single chunk + chunk_meta = ChunkMetadata( + chunk_id=self._generate_chunk_id(content, metadata, 0), + source=metadata.get("source", ""), + category=metadata.get("category", ""), + file=metadata.get("file", ""), + chunk_index=0, + total_chunks=1, + char_start=0, + char_end=len(content) + ) + yield content, chunk_meta + return + + # Calculate total chunks + effective_chunk_size = chunk_size - chunk_overlap + total_chunks = (len(content) - chunk_overlap) // effective_chunk_size + 1 + + # Generate chunks with overlap + for i in range(total_chunks): + start = i * effective_chunk_size + end = start + chunk_size + + # Ensure we don't go past the end + if end > len(content): + end = len(content) + + chunk_text = content[start:end] + + # Skip empty chunks + if not chunk_text.strip(): + continue + + chunk_meta = ChunkMetadata( + chunk_id=self._generate_chunk_id(chunk_text, metadata, i), + source=metadata.get("source", ""), + category=metadata.get("category", ""), + file=metadata.get("file", ""), + chunk_index=i, + total_chunks=total_chunks, + char_start=start, + char_end=end + ) + + yield chunk_text, chunk_meta + + def _generate_chunk_id(self, content: str, metadata: dict, chunk_index: int) -> str: + """Generate deterministic chunk ID.""" + id_string = ( + f"{metadata.get('source', '')}-" + f"{metadata.get('file', '')}-" + f"{chunk_index}-" + f"{content[:50]}" + ) + return hashlib.md5(id_string.encode()).hexdigest() + + def stream_skill_directory( + self, + skill_dir: Path, + callback: Optional[callable] = None + ) -> Iterator[tuple[str, dict]]: + """ + Stream all documents from skill directory. + + Args: + skill_dir: Path to skill directory + callback: Optional progress callback(progress: IngestionProgress) + + Yields: + Tuple of (chunk_text, chunk_metadata_dict) + """ + skill_dir = Path(skill_dir) + + # Count total documents first + doc_files = [] + + # SKILL.md + skill_md = skill_dir / "SKILL.md" + if skill_md.exists(): + doc_files.append(("SKILL.md", "overview", skill_md)) + + # Reference files + refs_dir = skill_dir / "references" + if refs_dir.exists(): + for ref_file in sorted(refs_dir.glob("*.md")): + if ref_file.is_file() and not ref_file.name.startswith("."): + category = ref_file.stem.replace("_", " ").lower() + doc_files.append((ref_file.name, category, ref_file)) + + # Initialize progress tracking + self.progress = IngestionProgress( + total_documents=len(doc_files), + processed_documents=0, + total_chunks=0, # Will be updated as we chunk + processed_chunks=0, + failed_chunks=0, + bytes_processed=0, + start_time=time.time() + ) + + # Process each document + for filename, category, filepath in doc_files: + try: + content = filepath.read_text(encoding="utf-8") + + if not content.strip(): + self.progress.processed_documents += 1 + continue + + metadata = { + "source": skill_dir.name, + "category": category, + "file": filename, + "type": "documentation" if filename == "SKILL.md" else "reference", + "version": "1.0.0" + } + + # Chunk document and yield chunks + chunk_count = 0 + for chunk_text, chunk_meta in self.chunk_document(content, metadata): + chunk_count += 1 + self.progress.total_chunks += 1 + + # Convert chunk metadata to dict + chunk_dict = { + "content": chunk_text, + "chunk_id": chunk_meta.chunk_id, + "source": chunk_meta.source, + "category": chunk_meta.category, + "file": chunk_meta.file, + "chunk_index": chunk_meta.chunk_index, + "total_chunks": chunk_meta.total_chunks, + "char_start": chunk_meta.char_start, + "char_end": chunk_meta.char_end, + } + + yield chunk_text, chunk_dict + + self.progress.processed_chunks += 1 + self.progress.bytes_processed += len(chunk_text.encode("utf-8")) + + # Callback for progress updates + if callback: + callback(self.progress) + + self.progress.processed_documents += 1 + + except Exception as e: + print(f"āš ļø Warning: Failed to process {filename}: {e}") + self.progress.failed_chunks += 1 + continue + + def batch_iterator( + self, + chunks: Iterator[tuple[str, dict]], + batch_size: Optional[int] = None + ) -> Iterator[list[tuple[str, dict]]]: + """ + Group chunks into batches for efficient processing. + + Args: + chunks: Iterator of (chunk_text, chunk_metadata) tuples + batch_size: Override default batch size + + Yields: + List of chunks (batch) + """ + batch_size = batch_size or self.batch_size + batch = [] + + for chunk in chunks: + batch.append(chunk) + + if len(batch) >= batch_size: + yield batch + batch = [] + + # Yield remaining chunks + if batch: + yield batch + + def save_checkpoint(self, checkpoint_path: Path, state: dict) -> None: + """ + Save ingestion checkpoint for resume capability. + + Args: + checkpoint_path: Path to checkpoint file + state: State dictionary to save + """ + checkpoint_path = Path(checkpoint_path) + checkpoint_path.parent.mkdir(parents=True, exist_ok=True) + + checkpoint_data = { + "timestamp": time.time(), + "progress": { + "total_documents": self.progress.total_documents, + "processed_documents": self.progress.processed_documents, + "total_chunks": self.progress.total_chunks, + "processed_chunks": self.progress.processed_chunks, + "failed_chunks": self.progress.failed_chunks, + "bytes_processed": self.progress.bytes_processed, + }, + "state": state + } + + checkpoint_path.write_text(json.dumps(checkpoint_data, indent=2)) + + def load_checkpoint(self, checkpoint_path: Path) -> Optional[dict]: + """ + Load ingestion checkpoint for resume. + + Args: + checkpoint_path: Path to checkpoint file + + Returns: + State dictionary or None if not found + """ + checkpoint_path = Path(checkpoint_path) + + if not checkpoint_path.exists(): + return None + + try: + checkpoint_data = json.loads(checkpoint_path.read_text()) + return checkpoint_data.get("state") + except Exception as e: + print(f"āš ļø Warning: Failed to load checkpoint: {e}") + return None + + def format_progress(self) -> str: + """ + Format progress as human-readable string. + + Returns: + Progress string + """ + if not self.progress: + return "No progress data" + + p = self.progress + + lines = [ + f"šŸ“Š Progress: {p.progress_percent:.1f}% complete", + f" Documents: {p.processed_documents}/{p.total_documents}", + f" Chunks: {p.processed_chunks}/{p.total_chunks}", + f" Rate: {p.chunks_per_second:.1f} chunks/sec", + f" Elapsed: {p.elapsed_time:.1f}s", + ] + + if p.eta_seconds > 0: + lines.append(f" ETA: {p.eta_seconds:.1f}s") + + if p.failed_chunks > 0: + lines.append(f" āš ļø Failed: {p.failed_chunks} chunks") + + return "\n".join(lines) + + +def example_usage(): + """Example usage of streaming ingestion.""" + + # Initialize ingester + ingester = StreamingIngester( + chunk_size=4000, + chunk_overlap=200, + batch_size=100 + ) + + # Progress callback + def on_progress(progress: IngestionProgress): + if progress.processed_chunks % 10 == 0: + print(f"Progress: {progress.progress_percent:.1f}% - " + f"{progress.processed_chunks}/{progress.total_chunks} chunks") + + # Stream skill directory + skill_dir = Path("output/react") + chunks = ingester.stream_skill_directory(skill_dir, callback=on_progress) + + # Process in batches + all_chunks = [] + for batch in ingester.batch_iterator(chunks, batch_size=50): + print(f"\nProcessing batch of {len(batch)} chunks...") + all_chunks.extend(batch) + + # Save checkpoint every batch + ingester.save_checkpoint( + Path("output/.checkpoints/react.json"), + {"processed_batches": len(all_chunks) // 50} + ) + + # Final progress + print("\n" + ingester.format_progress()) + print(f"\nāœ… Processed {len(all_chunks)} total chunks") + + +if __name__ == "__main__": + example_usage() diff --git a/tests/test_streaming_ingestion.py b/tests/test_streaming_ingestion.py new file mode 100644 index 0000000..4a1292d --- /dev/null +++ b/tests/test_streaming_ingestion.py @@ -0,0 +1,267 @@ +#!/usr/bin/env python3 +""" +Tests for streaming ingestion functionality. + +Validates: +- Chunking strategy (size, overlap) +- Memory-efficient processing +- Progress tracking +- Batch processing +- Resume capability +""" + +import pytest +from pathlib import Path +import sys +import tempfile +import json + +# Add src to path +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from skill_seekers.cli.streaming_ingest import ( + StreamingIngester, + IngestionProgress, + ChunkMetadata +) + + +@pytest.fixture +def temp_skill_dir(): + """Create temporary skill directory for testing.""" + with tempfile.TemporaryDirectory() as tmpdir: + skill_dir = Path(tmpdir) / "test_skill" + skill_dir.mkdir() + + # Create SKILL.md + skill_md = skill_dir / "SKILL.md" + skill_md.write_text("# Test Skill\n\n" + ("This is a test document. " * 200)) + + # Create references + refs_dir = skill_dir / "references" + refs_dir.mkdir() + + ref1 = refs_dir / "getting_started.md" + ref1.write_text("# Getting Started\n\n" + ("Step by step guide. " * 100)) + + ref2 = refs_dir / "api_reference.md" + ref2.write_text("# API Reference\n\n" + ("API documentation. " * 150)) + + yield skill_dir + + +def test_chunk_document_single_chunk(): + """Test chunking when document fits in single chunk.""" + ingester = StreamingIngester(chunk_size=1000, chunk_overlap=100) + + content = "Small document" + metadata = {"source": "test", "file": "test.md", "category": "overview"} + + chunks = list(ingester.chunk_document(content, metadata)) + + assert len(chunks) == 1 + chunk_text, chunk_meta = chunks[0] + + assert chunk_text == content + assert chunk_meta.chunk_index == 0 + assert chunk_meta.total_chunks == 1 + assert chunk_meta.source == "test" + + +def test_chunk_document_multiple_chunks(): + """Test chunking with multiple chunks.""" + ingester = StreamingIngester(chunk_size=100, chunk_overlap=20) + + content = "A" * 250 # Long content + metadata = {"source": "test", "file": "test.md", "category": "overview"} + + chunks = list(ingester.chunk_document(content, metadata)) + + # Should create multiple chunks + assert len(chunks) > 1 + + # Check overlap + for i in range(len(chunks) - 1): + chunk1_text, chunk1_meta = chunks[i] + chunk2_text, chunk2_meta = chunks[i + 1] + + # Second chunk should start before first ends (overlap) + assert chunk2_meta.char_start < chunk1_meta.char_end + + +def test_chunk_document_metadata(): + """Test chunk metadata is correct.""" + ingester = StreamingIngester(chunk_size=100, chunk_overlap=20) + + content = "B" * 250 + metadata = {"source": "test_source", "file": "test_file.md", "category": "test_cat"} + + chunks = list(ingester.chunk_document(content, metadata)) + + for i, (chunk_text, chunk_meta) in enumerate(chunks): + assert chunk_meta.chunk_index == i + assert chunk_meta.total_chunks == len(chunks) + assert chunk_meta.source == "test_source" + assert chunk_meta.file == "test_file.md" + assert chunk_meta.category == "test_cat" + assert len(chunk_meta.chunk_id) == 32 # MD5 hash length + + +def test_stream_skill_directory(temp_skill_dir): + """Test streaming entire skill directory.""" + ingester = StreamingIngester(chunk_size=500, chunk_overlap=50) + + chunks = list(ingester.stream_skill_directory(temp_skill_dir)) + + # Should have chunks from all files + assert len(chunks) > 0 + + # Check progress was tracked + assert ingester.progress is not None + assert ingester.progress.total_documents == 3 # SKILL.md + 2 refs + assert ingester.progress.processed_documents == 3 + assert ingester.progress.total_chunks > 0 + assert ingester.progress.processed_chunks == len(chunks) + + # Check chunk metadata + sources = set() + categories = set() + + for chunk_text, chunk_meta in chunks: + assert chunk_text # Not empty + assert chunk_meta["chunk_id"] + sources.add(chunk_meta["source"]) + categories.add(chunk_meta["category"]) + + assert "test_skill" in sources + assert "overview" in categories + + +def test_batch_iterator(): + """Test batch processing.""" + ingester = StreamingIngester() + + # Create dummy chunks + chunks = [(f"chunk_{i}", {"id": i}) for i in range(25)] + + batches = list(ingester.batch_iterator(iter(chunks), batch_size=10)) + + # Should have 3 batches (10, 10, 5) + assert len(batches) == 3 + assert len(batches[0]) == 10 + assert len(batches[1]) == 10 + assert len(batches[2]) == 5 + + +def test_progress_tracking(temp_skill_dir): + """Test progress tracking during streaming.""" + ingester = StreamingIngester(chunk_size=200, chunk_overlap=20) + + progress_updates = [] + + def callback(progress: IngestionProgress): + progress_updates.append({ + "processed_docs": progress.processed_documents, + "processed_chunks": progress.processed_chunks, + "percent": progress.progress_percent + }) + + list(ingester.stream_skill_directory(temp_skill_dir, callback=callback)) + + # Should have received progress updates + assert len(progress_updates) > 0 + + # Progress should increase + for i in range(len(progress_updates) - 1): + assert progress_updates[i + 1]["processed_chunks"] >= progress_updates[i]["processed_chunks"] + + +def test_checkpoint_save_load(): + """Test checkpoint save and load.""" + ingester = StreamingIngester() + + with tempfile.TemporaryDirectory() as tmpdir: + checkpoint_path = Path(tmpdir) / "checkpoint.json" + + # Initialize progress + ingester.progress = IngestionProgress( + total_documents=10, + processed_documents=5, + total_chunks=100, + processed_chunks=50, + failed_chunks=2, + bytes_processed=10000, + start_time=1234567890.0 + ) + + # Save checkpoint + state = {"last_processed_file": "test.md", "batch_number": 3} + ingester.save_checkpoint(checkpoint_path, state) + + assert checkpoint_path.exists() + + # Load checkpoint + loaded_state = ingester.load_checkpoint(checkpoint_path) + + assert loaded_state == state + + +def test_format_progress(): + """Test progress formatting.""" + ingester = StreamingIngester() + + ingester.progress = IngestionProgress( + total_documents=10, + processed_documents=5, + total_chunks=100, + processed_chunks=50, + failed_chunks=0, + bytes_processed=10000, + start_time=0.0 + ) + + progress_str = ingester.format_progress() + + assert "50.0%" in progress_str + assert "50/100" in progress_str + assert "5/10" in progress_str + + +def test_empty_directory(): + """Test handling empty directory.""" + ingester = StreamingIngester() + + with tempfile.TemporaryDirectory() as tmpdir: + empty_dir = Path(tmpdir) / "empty" + empty_dir.mkdir() + + chunks = list(ingester.stream_skill_directory(empty_dir)) + + assert len(chunks) == 0 + assert ingester.progress.total_documents == 0 + + +def test_chunk_size_validation(): + """Test different chunk sizes.""" + content = "X" * 1000 + + # Small chunks + ingester_small = StreamingIngester(chunk_size=100, chunk_overlap=10) + chunks_small = list(ingester_small.chunk_document( + content, + {"source": "test", "file": "test.md", "category": "test"} + )) + + # Large chunks + ingester_large = StreamingIngester(chunk_size=500, chunk_overlap=50) + chunks_large = list(ingester_large.chunk_document( + content, + {"source": "test", "file": "test.md", "category": "test"} + )) + + # Smaller chunk size should create more chunks + assert len(chunks_small) > len(chunks_large) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])