#!/usr/bin/env python3 """ PDF Text Extractor - Complete Feature Set (Tasks B1.2 + B1.3 + B1.4 + B1.5 + Priority 2 & 3) Extracts text, code blocks, and images from PDF documentation files. Uses PyMuPDF (fitz) for fast, high-quality extraction. Features: - Text and markdown extraction - Code block detection (font, indent, pattern) - Language detection with confidence scoring (19+ languages) (B1.4) - Syntax validation and quality scoring (B1.4) - Quality statistics and filtering (B1.4) - Image extraction to files (B1.5) - Image filtering by size (B1.5) - Page chunking and chapter detection (B1.3) - Code block merging across pages (B1.3) Advanced Features (Priority 2 & 3): - OCR support for scanned PDFs (requires pytesseract) (Priority 2) - Password-protected PDF support (Priority 2) - Table extraction (Priority 2) - Parallel page processing (Priority 3) - Caching of expensive operations (Priority 3) Usage: # Basic extraction python3 pdf_extractor_poc.py input.pdf python3 pdf_extractor_poc.py input.pdf --output output.json python3 pdf_extractor_poc.py input.pdf --verbose # Quality filtering python3 pdf_extractor_poc.py input.pdf --min-quality 5.0 # Image extraction python3 pdf_extractor_poc.py input.pdf --extract-images python3 pdf_extractor_poc.py input.pdf --extract-images --image-dir images/ # Advanced features python3 pdf_extractor_poc.py scanned.pdf --ocr python3 pdf_extractor_poc.py encrypted.pdf --password mypassword python3 pdf_extractor_poc.py input.pdf --extract-tables python3 pdf_extractor_poc.py large.pdf --parallel --workers 8 Example: python3 pdf_extractor_poc.py docs/manual.pdf -o output.json -v \ --chunk-size 15 --min-quality 6.0 --extract-images \ --extract-tables --parallel """ import argparse import json import os import re import sys from pathlib import Path # Import unified language detector from skill_seekers.cli.language_detector import LanguageDetector # Check if PyMuPDF is installed try: import fitz # PyMuPDF except ImportError: print("ERROR: PyMuPDF not installed") print("Install with: pip install PyMuPDF") sys.exit(1) # Optional dependencies for advanced features try: import pytesseract from PIL import Image TESSERACT_AVAILABLE = True except ImportError: TESSERACT_AVAILABLE = False try: import concurrent.futures CONCURRENT_AVAILABLE = True except ImportError: CONCURRENT_AVAILABLE = False class PDFExtractor: """Extract text and code from PDF documentation""" def __init__( self, pdf_path, verbose=False, chunk_size=10, min_quality=0.0, extract_images=False, image_dir=None, min_image_size=100, use_ocr=False, password=None, extract_tables=False, parallel=False, max_workers=None, use_cache=True, ): self.pdf_path = pdf_path self.verbose = verbose self.chunk_size = chunk_size # Pages per chunk (0 = no chunking) self.min_quality = min_quality # Minimum quality score (0-10) self.extract_images = extract_images # Extract images to files (NEW in B1.5) self.image_dir = image_dir # Directory to save images (NEW in B1.5) self.min_image_size = min_image_size # Minimum image dimension (NEW in B1.5) # Advanced features (Priority 2 & 3) self.use_ocr = use_ocr # OCR for scanned PDFs (Priority 2) self.password = password # Password for encrypted PDFs (Priority 2) self.extract_tables = extract_tables # Extract tables (Priority 2) self.parallel = parallel # Parallel processing (Priority 3) self.max_workers = max_workers or os.cpu_count() # Worker threads (Priority 3) self.use_cache = use_cache # Cache expensive operations (Priority 3) self.doc = None self.pages = [] self.chapters = [] # Detected chapters/sections self.extracted_images = [] # List of extracted image info (NEW in B1.5) self._cache = {} # Cache for expensive operations (Priority 3) # Language detection self.language_detector = LanguageDetector(min_confidence=0.15) def log(self, message): """Print message if verbose mode enabled""" if self.verbose: print(message) def extract_text_with_ocr(self, page): """ Extract text from scanned PDF page using OCR (Priority 2). Falls back to regular text extraction if OCR is not available. Args: page: PyMuPDF page object Returns: str: Extracted text """ # Try regular text extraction first text = page.get_text("text").strip() # If page has very little text, it might be scanned if len(text) < 50 and self.use_ocr: if not TESSERACT_AVAILABLE: self.log("āš ļø OCR requested but pytesseract not installed") self.log(" Install with: pip install pytesseract Pillow") return text try: # Render page as image pix = page.get_pixmap() img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) # Run OCR ocr_text = pytesseract.image_to_string(img) self.log(f" OCR extracted {len(ocr_text)} chars (was {len(text)})") return ocr_text if len(ocr_text) > len(text) else text except Exception as e: self.log(f" OCR failed: {e}") return text return text def extract_tables_from_page(self, page): """ Extract tables from PDF page (Priority 2). Uses PyMuPDF's table detection. Args: page: PyMuPDF page object Returns: list: List of extracted tables as dicts """ if not self.extract_tables: return [] tables = [] try: # PyMuPDF table extraction tabs = page.find_tables() for idx, tab in enumerate(tabs.tables): table_data = { "table_index": idx, "rows": tab.extract(), "bbox": tab.bbox, "row_count": len(tab.extract()), "col_count": len(tab.extract()[0]) if tab.extract() else 0, } tables.append(table_data) self.log( f" Found table {idx}: {table_data['row_count']}x{table_data['col_count']}" ) except Exception as e: self.log(f" Table extraction failed: {e}") return tables def get_cached(self, key): """ Get cached value (Priority 3). Args: key: Cache key Returns: Cached value or None """ if not self.use_cache: return None return self._cache.get(key) def set_cached(self, key, value): """ Set cached value (Priority 3). Args: key: Cache key value: Value to cache """ if self.use_cache: self._cache[key] = value def detect_language_from_code(self, code): """ Detect programming language from code content using patterns. Enhanced in B1.4 with confidence scoring. UPDATED: Now uses shared LanguageDetector with 20+ languages Returns (language, confidence) tuple """ return self.language_detector.detect_from_code(code) def validate_code_syntax(self, code, language): """ Validate code syntax (basic checks). Enhanced in B1.4 with syntax validation. Returns (is_valid, issues) tuple """ issues = [] # Common syntax checks if not code.strip(): return False, ["Empty code block"] # Language-specific validation if language == "python": # Check indentation consistency lines = code.split("\n") indent_chars = set() for line in lines: if line.startswith(" "): indent_chars.add("space") elif line.startswith("\t"): indent_chars.add("tab") if len(indent_chars) > 1: issues.append("Mixed tabs and spaces") # Check for unclosed brackets/parens open_count = code.count("(") + code.count("[") + code.count("{") close_count = code.count(")") + code.count("]") + code.count("}") if abs(open_count - close_count) > 2: # Allow small mismatch issues.append("Unbalanced brackets") elif language in ["javascript", "java", "cpp", "c", "csharp", "go"]: # Check for balanced braces open_braces = code.count("{") close_braces = code.count("}") if abs(open_braces - close_braces) > 1: issues.append("Unbalanced braces") elif language == "json": # Try to parse JSON try: json.loads(code) except (json.JSONDecodeError, ValueError) as e: issues.append(f"Invalid JSON syntax: {str(e)[:50]}") # General checks # Check if code looks like natural language (too many common words) common_words = ["the", "and", "for", "with", "this", "that", "have", "from"] word_count = sum(1 for word in common_words if word in code.lower()) if word_count > 5 and len(code.split()) < 50: issues.append("May be natural language, not code") # Check code/comment ratio comment_lines = sum( 1 for line in code.split("\n") if line.strip().startswith(("#", "//", "/*", "*", "--")) ) total_lines = len([line for line in code.split("\n") if line.strip()]) if total_lines > 0 and comment_lines / total_lines > 0.7: issues.append("Mostly comments") return len(issues) == 0, issues def score_code_quality(self, code, language, confidence): """ Score the quality/usefulness of detected code block. New in B1.4. Returns quality score (0-10) """ score = 5.0 # Start with neutral score # Factor 1: Language detection confidence score += confidence * 2.0 # Factor 2: Code length (not too short, not too long) code_length = len(code.strip()) if 20 <= code_length <= 500: score += 1.0 elif 500 < code_length <= 2000: score += 0.5 elif code_length < 10: score -= 2.0 # Factor 3: Number of lines lines = [line for line in code.split("\n") if line.strip()] if 2 <= len(lines) <= 50: score += 1.0 elif len(lines) > 100: score -= 1.0 # Factor 4: Has function/class definitions if re.search(r"\b(def|function|class|func|fn|public class)\b", code): score += 1.5 # Factor 5: Has meaningful variable names (not just x, y, i) meaningful_vars = re.findall(r"\b[a-z_][a-z0-9_]{3,}\b", code.lower()) if len(meaningful_vars) >= 2: score += 1.0 # Factor 6: Syntax validation is_valid, issues = self.validate_code_syntax(code, language) if is_valid: score += 1.0 else: score -= len(issues) * 0.5 # Clamp score to 0-10 range return max(0, min(10, score)) def detect_code_blocks_by_font(self, page): """ Detect code blocks by analyzing font properties. Monospace fonts typically indicate code. Returns list of detected code blocks with metadata. """ code_blocks = [] blocks = page.get_text("dict")["blocks"] monospace_fonts = ["courier", "mono", "consolas", "menlo", "monaco", "dejavu"] current_code = [] current_font = None for block in blocks: if "lines" not in block: continue for line in block["lines"]: for span in line["spans"]: font = span["font"].lower() text = span["text"] # Check if font is monospace is_monospace = any(mf in font for mf in monospace_fonts) if is_monospace: # Accumulate code text current_code.append(text) current_font = span["font"] else: # End of code block if current_code: code_text = "".join(current_code).strip() if len(code_text) > 10: # Minimum code length lang, confidence = self.detect_language_from_code(code_text) quality = self.score_code_quality(code_text, lang, confidence) is_valid, issues = self.validate_code_syntax(code_text, lang) code_blocks.append( { "code": code_text, "language": lang, "confidence": confidence, "quality_score": quality, "is_valid": is_valid, "validation_issues": issues if not is_valid else [], "font": current_font, "detection_method": "font", } ) current_code = [] current_font = None # Handle final code block if current_code: code_text = "".join(current_code).strip() if len(code_text) > 10: lang, confidence = self.detect_language_from_code(code_text) quality = self.score_code_quality(code_text, lang, confidence) is_valid, issues = self.validate_code_syntax(code_text, lang) code_blocks.append( { "code": code_text, "language": lang, "confidence": confidence, "quality_score": quality, "is_valid": is_valid, "validation_issues": issues if not is_valid else [], "font": current_font, "detection_method": "font", } ) return code_blocks def detect_code_blocks_by_indent(self, text): """ Detect code blocks by indentation patterns. Code often has consistent indentation. Returns list of detected code blocks. """ code_blocks = [] lines = text.split("\n") current_block = [] indent_pattern = None for line in lines: # Check for indentation (4 spaces or tab) if line.startswith(" ") or line.startswith("\t"): # Start or continue code block if not indent_pattern: indent_pattern = line[:4] if line.startswith(" ") else "\t" current_block.append(line) else: # End of code block if current_block and len(current_block) >= 2: # At least 2 lines code_text = "\n".join(current_block).strip() if len(code_text) > 20: # Minimum code length lang, confidence = self.detect_language_from_code(code_text) quality = self.score_code_quality(code_text, lang, confidence) is_valid, issues = self.validate_code_syntax(code_text, lang) code_blocks.append( { "code": code_text, "language": lang, "confidence": confidence, "quality_score": quality, "is_valid": is_valid, "validation_issues": issues if not is_valid else [], "detection_method": "indent", } ) current_block = [] indent_pattern = None # Handle final block if current_block and len(current_block) >= 2: code_text = "\n".join(current_block).strip() if len(code_text) > 20: lang, confidence = self.detect_language_from_code(code_text) quality = self.score_code_quality(code_text, lang, confidence) is_valid, issues = self.validate_code_syntax(code_text, lang) code_blocks.append( { "code": code_text, "language": lang, "confidence": confidence, "quality_score": quality, "is_valid": is_valid, "validation_issues": issues if not is_valid else [], "detection_method": "indent", } ) return code_blocks def detect_code_blocks_by_pattern(self, text): """ Detect code blocks by common code patterns (keywords, syntax). Returns list of detected code snippets. """ code_blocks = [] # Common code patterns that span multiple lines patterns = [ # Function definitions ( r"((?:def|function|func|fn|public|private)\s+\w+\s*\([^)]*\)\s*[{:]?[^}]*[}]?)", "function", ), # Class definitions (r"(class\s+\w+[^{]*\{[^}]*\})", "class"), # Import statements block ( r"((?:import|require|use|include)[^\n]+(?:\n(?:import|require|use|include)[^\n]+)*)", "imports", ), ] for pattern, block_type in patterns: matches = re.finditer(pattern, text, re.MULTILINE | re.DOTALL) for match in matches: code_text = match.group(1).strip() if len(code_text) > 15: lang, confidence = self.detect_language_from_code(code_text) quality = self.score_code_quality(code_text, lang, confidence) is_valid, issues = self.validate_code_syntax(code_text, lang) code_blocks.append( { "code": code_text, "language": lang, "confidence": confidence, "quality_score": quality, "is_valid": is_valid, "validation_issues": issues if not is_valid else [], "detection_method": "pattern", "pattern_type": block_type, } ) return code_blocks def detect_chapter_start(self, page_data): """ Detect if a page starts a new chapter/section. Returns (is_chapter_start, chapter_title) tuple. """ headings = page_data.get("headings", []) # Check for h1 or h2 at start of page if headings: first_heading = headings[0] # H1 headings are strong indicators of chapters if first_heading["level"] in ["h1", "h2"]: return True, first_heading["text"] # Check for specific chapter markers in text text = page_data.get("text", "") first_line = text.split("\n")[0] if text else "" chapter_patterns = [ r"^Chapter\s+\d+", r"^Part\s+\d+", r"^Section\s+\d+", r"^\d+\.\s+[A-Z]", # "1. Introduction" ] for pattern in chapter_patterns: if re.match(pattern, first_line, re.IGNORECASE): return True, first_line.strip() return False, None def merge_continued_code_blocks(self, pages): """ Merge code blocks that are split across pages. Detects when a code block at the end of one page continues on the next page. """ for i in range(len(pages) - 1): current_page = pages[i] next_page = pages[i + 1] # Check if current page has code blocks if not current_page["code_samples"]: continue # Get last code block of current page last_code = current_page["code_samples"][-1] # Check if next page starts with code if not next_page["code_samples"]: continue first_next_code = next_page["code_samples"][0] # Same language and detection method = likely continuation if ( last_code["language"] == first_next_code["language"] and last_code["detection_method"] == first_next_code["detection_method"] ): # Check if last code block looks incomplete (doesn't end with closing brace/etc) last_code_text = last_code["code"].rstrip() continuation_indicators = [ not last_code_text.endswith("}"), not last_code_text.endswith(";"), last_code_text.endswith(","), last_code_text.endswith("\\"), ] if any(continuation_indicators): # Merge the code blocks merged_code = last_code["code"] + "\n" + first_next_code["code"] last_code["code"] = merged_code last_code["merged_from_next_page"] = True # Remove the first code block from next page next_page["code_samples"].pop(0) next_page["code_blocks_count"] -= 1 self.log(f" Merged code block from page {i + 1} to {i + 2}") return pages def create_chunks(self, pages): """ Create chunks of pages for better organization. Returns array of chunks, each containing: - chunk_number - start_page, end_page - pages (array) - chapter_title (if detected) """ if self.chunk_size == 0: # No chunking - return all pages as one chunk return [ { "chunk_number": 1, "start_page": 1, "end_page": len(pages), "pages": pages, "chapter_title": None, } ] chunks = [] current_chunk = [] chunk_start = 0 current_chapter = None for i, page in enumerate(pages): # Check if this page starts a new chapter is_chapter, chapter_title = self.detect_chapter_start(page) if is_chapter and current_chunk: # Save current chunk before starting new one chunks.append( { "chunk_number": len(chunks) + 1, "start_page": chunk_start + 1, "end_page": i, "pages": current_chunk, "chapter_title": current_chapter, } ) current_chunk = [] chunk_start = i current_chapter = chapter_title if not current_chapter and is_chapter: current_chapter = chapter_title current_chunk.append(page) # Check if chunk size reached (but don't break chapters) if not is_chapter and len(current_chunk) >= self.chunk_size: chunks.append( { "chunk_number": len(chunks) + 1, "start_page": chunk_start + 1, "end_page": i + 1, "pages": current_chunk, "chapter_title": current_chapter, } ) current_chunk = [] chunk_start = i + 1 current_chapter = None # Add remaining pages as final chunk if current_chunk: chunks.append( { "chunk_number": len(chunks) + 1, "start_page": chunk_start + 1, "end_page": len(pages), "pages": current_chunk, "chapter_title": current_chapter, } ) return chunks def extract_images_from_page(self, page, page_num): """ Extract images from a PDF page and save to disk (NEW in B1.5). Returns list of extracted image metadata. """ if not self.extract_images: # Just count images, don't extract return [] extracted = [] image_list = page.get_images() for img_index, img in enumerate(image_list): try: xref = img[0] # Image XREF number base_image = self.doc.extract_image(xref) if not base_image: continue image_bytes = base_image["image"] image_ext = base_image["ext"] # png, jpeg, etc. width = base_image.get("width", 0) height = base_image.get("height", 0) # Filter out small images (icons, bullets, etc.) if width < self.min_image_size or height < self.min_image_size: self.log(f" Skipping small image: {width}x{height}") continue # Generate filename pdf_basename = Path(self.pdf_path).stem image_filename = f"{pdf_basename}_page{page_num + 1}_img{img_index + 1}.{image_ext}" # Save image image_path = Path(self.image_dir) / image_filename image_path.parent.mkdir(parents=True, exist_ok=True) with open(image_path, "wb") as f: f.write(image_bytes) # Store metadata image_info = { "filename": image_filename, "path": str(image_path), "page_number": page_num + 1, "width": width, "height": height, "format": image_ext, "size_bytes": len(image_bytes), "xref": xref, } extracted.append(image_info) self.extracted_images.append(image_info) self.log(f" Extracted image: {image_filename} ({width}x{height})") except Exception as e: self.log(f" Error extracting image {img_index}: {e}") continue return extracted def extract_page(self, page_num): """ Extract content from a single PDF page. Returns dict with page content, code blocks, and metadata. """ # Check cache first (Priority 3) cache_key = f"page_{page_num}" cached = self.get_cached(cache_key) if cached is not None: self.log(f" Page {page_num + 1}: Using cached data") return cached page = self.doc.load_page(page_num) # Extract plain text (with OCR if enabled - Priority 2) text = self.extract_text_with_ocr(page) if self.use_ocr else page.get_text("text") # Extract markdown (better structure preservation) # Use "text" format with layout info for PyMuDF 1.24+ try: markdown = page.get_text("markdown") except (AssertionError, ValueError, RuntimeError, TypeError, AttributeError): # Fallback to text format for incompatible PyMuPDF versions # Some versions don't support "markdown" format or have internal errors markdown = page.get_text( "text", flags=fitz.TEXT_PRESERVE_WHITESPACE | fitz.TEXT_PRESERVE_LIGATURES | fitz.TEXT_PRESERVE_SPANS, ) # Extract tables (Priority 2) tables = self.extract_tables_from_page(page) # Get page images (for diagrams) images = page.get_images() # Extract images to files (NEW in B1.5) extracted_images = self.extract_images_from_page(page, page_num) # Detect code blocks using multiple methods font_code_blocks = self.detect_code_blocks_by_font(page) indent_code_blocks = self.detect_code_blocks_by_indent(text) pattern_code_blocks = self.detect_code_blocks_by_pattern(text) # Merge and deduplicate code blocks all_code_blocks = font_code_blocks + indent_code_blocks + pattern_code_blocks # Simple deduplication by code content unique_code = {} for block in all_code_blocks: code_hash = hash(block["code"]) if code_hash not in unique_code: unique_code[code_hash] = block else: # Keep the one with higher quality score if block["quality_score"] > unique_code[code_hash]["quality_score"]: unique_code[code_hash] = block code_samples = list(unique_code.values()) # Filter by minimum quality (NEW in B1.4) if self.min_quality > 0: code_samples_before = len(code_samples) code_samples = [c for c in code_samples if c["quality_score"] >= self.min_quality] filtered_count = code_samples_before - len(code_samples) if filtered_count > 0: self.log( f" Filtered out {filtered_count} low-quality code blocks (min_quality={self.min_quality})" ) # Sort by quality score (highest first) code_samples.sort(key=lambda x: x["quality_score"], reverse=True) # Extract headings from markdown headings = [] for line in markdown.split("\n"): if line.startswith("#"): level = len(line) - len(line.lstrip("#")) text = line.lstrip("#").strip() if text: headings.append({"level": f"h{level}", "text": text}) page_data = { "page_number": page_num + 1, # 1-indexed for humans "text": text.strip(), "markdown": markdown.strip(), "headings": headings, "code_samples": code_samples, "images_count": len(images), "extracted_images": extracted_images, # NEW in B1.5 "tables": tables, # NEW in Priority 2 "char_count": len(text), "code_blocks_count": len(code_samples), "tables_count": len(tables), # NEW in Priority 2 } # Cache the result (Priority 3) self.set_cached(cache_key, page_data) self.log( f" Page {page_num + 1}: {len(text)} chars, {len(code_samples)} code blocks, {len(headings)} headings, {len(extracted_images)} images, {len(tables)} tables" ) return page_data def extract_all(self): """ Extract content from all pages of the PDF. Enhanced with password support and parallel processing. Returns dict with metadata and pages array. """ print(f"\nšŸ“„ Extracting from: {self.pdf_path}") # Open PDF (with password support - Priority 2) try: self.doc = fitz.open(self.pdf_path) # Handle encrypted PDFs (Priority 2) if self.doc.is_encrypted: if self.password: print(" šŸ” PDF is encrypted, trying password...") if self.doc.authenticate(self.password): print(" āœ… Password accepted") else: print(" āŒ Invalid password") return None else: print(" āŒ PDF is encrypted but no password provided") print(" Use --password option to provide password") return None except Exception as e: print(f"āŒ Error opening PDF: {e}") return None print(f" Pages: {len(self.doc)}") print(f" Metadata: {self.doc.metadata}") # Set up image directory (NEW in B1.5) if self.extract_images and not self.image_dir: pdf_basename = Path(self.pdf_path).stem self.image_dir = f"output/{pdf_basename}_images" print(f" Image directory: {self.image_dir}") # Show feature status if self.use_ocr: status = ( "āœ… enabled" if TESSERACT_AVAILABLE else "āš ļø not available (install pytesseract)" ) print(f" OCR: {status}") if self.extract_tables: print(" Table extraction: āœ… enabled") if self.parallel: status = "āœ… enabled" if CONCURRENT_AVAILABLE else "āš ļø not available" print(f" Parallel processing: {status} ({self.max_workers} workers)") if self.use_cache: print(" Caching: āœ… enabled") print("") # Extract each page (with parallel processing - Priority 3) if self.parallel and CONCURRENT_AVAILABLE and len(self.doc) > 5: print( f"šŸš€ Extracting {len(self.doc)} pages in parallel ({self.max_workers} workers)..." ) with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: page_numbers = list(range(len(self.doc))) self.pages = list(executor.map(self.extract_page, page_numbers)) else: # Sequential extraction for page_num in range(len(self.doc)): page_data = self.extract_page(page_num) self.pages.append(page_data) # Merge code blocks that span across pages self.log("\nšŸ”— Merging code blocks across pages...") self.pages = self.merge_continued_code_blocks(self.pages) # Create chunks self.log(f"\nšŸ“¦ Creating chunks (chunk_size={self.chunk_size})...") chunks = self.create_chunks(self.pages) # Build summary total_chars = sum(p["char_count"] for p in self.pages) total_code_blocks = sum(p["code_blocks_count"] for p in self.pages) total_headings = sum(len(p["headings"]) for p in self.pages) total_images = sum(p["images_count"] for p in self.pages) total_tables = sum(p["tables_count"] for p in self.pages) # NEW in Priority 2 # Detect languages used languages = {} all_code_blocks_list = [] for page in self.pages: for code in page["code_samples"]: lang = code["language"] languages[lang] = languages.get(lang, 0) + 1 all_code_blocks_list.append(code) # Calculate quality statistics (NEW in B1.4) quality_stats = {} if all_code_blocks_list: quality_scores = [c["quality_score"] for c in all_code_blocks_list] confidences = [c["confidence"] for c in all_code_blocks_list] valid_count = sum(1 for c in all_code_blocks_list if c["is_valid"]) quality_stats = { "average_quality": sum(quality_scores) / len(quality_scores), "average_confidence": sum(confidences) / len(confidences), "valid_code_blocks": valid_count, "invalid_code_blocks": total_code_blocks - valid_count, "validation_rate": valid_count / total_code_blocks if total_code_blocks > 0 else 0, "high_quality_blocks": sum(1 for s in quality_scores if s >= 7.0), "medium_quality_blocks": sum(1 for s in quality_scores if 4.0 <= s < 7.0), "low_quality_blocks": sum(1 for s in quality_scores if s < 4.0), } # Extract chapter information chapters = [] for chunk in chunks: if chunk["chapter_title"]: chapters.append( { "title": chunk["chapter_title"], "start_page": chunk["start_page"], "end_page": chunk["end_page"], } ) result = { "source_file": self.pdf_path, "metadata": self.doc.metadata, "total_pages": len(self.doc), "total_chars": total_chars, "total_code_blocks": total_code_blocks, "total_headings": total_headings, "total_images": total_images, "total_extracted_images": len(self.extracted_images), # NEW in B1.5 "total_tables": total_tables, # NEW in Priority 2 "image_directory": self.image_dir if self.extract_images else None, # NEW in B1.5 "extracted_images": self.extracted_images, # NEW in B1.5 "total_chunks": len(chunks), "chapters": chapters, "languages_detected": languages, "quality_statistics": quality_stats, # NEW in B1.4 "chunks": chunks, "pages": self.pages, # Still include all pages for compatibility } # Close document self.doc.close() print("\nāœ… Extraction complete:") print(f" Total characters: {total_chars:,}") print(f" Code blocks found: {total_code_blocks}") print(f" Headings found: {total_headings}") print(f" Images found: {total_images}") if self.extract_images: print(f" Images extracted: {len(self.extracted_images)}") if self.image_dir: print(f" Image directory: {self.image_dir}") if self.extract_tables: print(f" Tables found: {total_tables}") print(f" Chunks created: {len(chunks)}") print(f" Chapters detected: {len(chapters)}") print(f" Languages detected: {', '.join(languages.keys())}") # Print quality statistics (NEW in B1.4) if quality_stats: print("\nšŸ“Š Code Quality Statistics:") print(f" Average quality: {quality_stats['average_quality']:.1f}/10") print(f" Average confidence: {quality_stats['average_confidence']:.1%}") print( f" Valid code blocks: {quality_stats['valid_code_blocks']}/{total_code_blocks} ({quality_stats['validation_rate']:.1%})" ) print(f" High quality (7+): {quality_stats['high_quality_blocks']}") print(f" Medium quality (4-7): {quality_stats['medium_quality_blocks']}") print(f" Low quality (<4): {quality_stats['low_quality_blocks']}") return result def main(): parser = argparse.ArgumentParser( description="Extract text and code blocks from PDF documentation", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Extract from PDF python3 pdf_extractor_poc.py input.pdf # Save to JSON file python3 pdf_extractor_poc.py input.pdf --output result.json # Verbose mode python3 pdf_extractor_poc.py input.pdf --verbose # Extract and save python3 pdf_extractor_poc.py docs/python.pdf -o python_extracted.json -v """, ) parser.add_argument("pdf_file", help="Path to PDF file to extract") parser.add_argument("-o", "--output", help="Output JSON file path (default: print to stdout)") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON output") parser.add_argument( "--chunk-size", type=int, default=10, help="Pages per chunk (0 = no chunking, default: 10)" ) parser.add_argument( "--no-merge", action="store_true", help="Disable merging code blocks across pages" ) parser.add_argument( "--min-quality", type=float, default=0.0, help="Minimum code quality score (0-10, default: 0 = no filtering)", ) parser.add_argument( "--extract-images", action="store_true", help="Extract images to files (NEW in B1.5)" ) parser.add_argument( "--image-dir", type=str, default=None, help="Directory to save extracted images (default: output/{pdf_name}_images)", ) parser.add_argument( "--min-image-size", type=int, default=100, help="Minimum image dimension in pixels (filters icons, default: 100)", ) # Advanced features (Priority 2 & 3) parser.add_argument( "--ocr", action="store_true", help="Use OCR for scanned PDFs (requires pytesseract)" ) parser.add_argument("--password", type=str, default=None, help="Password for encrypted PDF") parser.add_argument( "--extract-tables", action="store_true", help="Extract tables from PDF (Priority 2)" ) parser.add_argument( "--parallel", action="store_true", help="Process pages in parallel (Priority 3)" ) parser.add_argument( "--workers", type=int, default=None, help="Number of parallel workers (default: CPU count)" ) parser.add_argument( "--no-cache", action="store_true", help="Disable caching of expensive operations" ) args = parser.parse_args() # Validate input file if not os.path.exists(args.pdf_file): print(f"āŒ Error: File not found: {args.pdf_file}") sys.exit(1) if not args.pdf_file.lower().endswith(".pdf"): print("āš ļø Warning: File does not have .pdf extension") # Extract extractor = PDFExtractor( args.pdf_file, verbose=args.verbose, chunk_size=args.chunk_size, min_quality=args.min_quality, extract_images=args.extract_images, image_dir=args.image_dir, min_image_size=args.min_image_size, # Advanced features (Priority 2 & 3) use_ocr=args.ocr, password=args.password, extract_tables=args.extract_tables, parallel=args.parallel, max_workers=args.workers, use_cache=not args.no_cache, ) result = extractor.extract_all() if result is None: sys.exit(1) # Output if args.output: # Save to file with open(args.output, "w", encoding="utf-8") as f: if args.pretty: json.dump(result, f, indent=2, ensure_ascii=False) else: json.dump(result, f, ensure_ascii=False) print(f"\nšŸ’¾ Saved to: {args.output}") else: # Print to stdout if args.pretty: print("\n" + json.dumps(result, indent=2, ensure_ascii=False)) else: print(json.dumps(result, ensure_ascii=False)) if __name__ == "__main__": main()