Release v1.27.0: Enhance markdown-tools with Heavy Mode

Add multi-tool orchestration for best-quality document conversion:
- Dual mode: Quick (fast) and Heavy (best quality, multi-tool merge)
- New convert.py - main orchestrator with tool selection matrix
- New merge_outputs.py - segment-level multi-tool output merger
- New validate_output.py - quality validation with HTML reports
- Enhanced extract_pdf_images.py - metadata (page, position, dimensions)
- PyMuPDF4LLM integration for LLM-optimized PDF conversion
- pandoc integration for DOCX/PPTX structure preservation
- Quality metrics: text/table/image retention with pass/warn/fail
- New references: heavy-mode-guide.md, tool-comparison.md

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
daymade
2026-01-25 21:36:08 +08:00
parent 114c355aa8
commit 3f15b8942c
10 changed files with 2009 additions and 89 deletions

434
markdown-tools/scripts/convert.py Executable file
View File

@@ -0,0 +1,434 @@
#!/usr/bin/env python3
"""
Multi-tool document to markdown converter with intelligent orchestration.
Supports Quick Mode (fast, single tool) and Heavy Mode (best quality, multi-tool merge).
Usage:
# Quick Mode (default) - fast, single best tool
uv run --with pymupdf4llm --with markitdown scripts/convert.py document.pdf -o output.md
# Heavy Mode - multi-tool parallel execution with merge
uv run --with pymupdf4llm --with markitdown scripts/convert.py document.pdf -o output.md --heavy
# With image extraction
uv run --with pymupdf4llm scripts/convert.py document.pdf -o output.md --assets-dir ./images
Dependencies:
- pymupdf4llm: PDF conversion (LLM-optimized)
- markitdown: PDF/DOCX/PPTX conversion
- pandoc: DOCX/PPTX conversion (system install: brew install pandoc)
"""
import argparse
import subprocess
import sys
import tempfile
import shutil
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class ConversionResult:
"""Result from a single tool conversion."""
markdown: str
tool: str
images: list[str] = field(default_factory=list)
success: bool = True
error: str = ""
def check_tool_available(tool: str) -> bool:
"""Check if a conversion tool is available."""
if tool == "pymupdf4llm":
try:
import pymupdf4llm
return True
except ImportError:
return False
elif tool == "markitdown":
try:
import markitdown
return True
except ImportError:
return False
elif tool == "pandoc":
return shutil.which("pandoc") is not None
return False
def select_tools(file_path: Path, mode: str) -> list[str]:
"""Select conversion tools based on file type and mode."""
ext = file_path.suffix.lower()
# Tool preferences by format
tool_map = {
".pdf": {
"quick": ["pymupdf4llm", "markitdown"], # fallback order
"heavy": ["pymupdf4llm", "markitdown"],
},
".docx": {
"quick": ["pandoc", "markitdown"],
"heavy": ["pandoc", "markitdown"],
},
".doc": {
"quick": ["pandoc", "markitdown"],
"heavy": ["pandoc", "markitdown"],
},
".pptx": {
"quick": ["markitdown", "pandoc"],
"heavy": ["markitdown", "pandoc"],
},
".xlsx": {
"quick": ["markitdown"],
"heavy": ["markitdown"],
},
}
tools = tool_map.get(ext, {"quick": ["markitdown"], "heavy": ["markitdown"]})
if mode == "quick":
# Return first available tool
for tool in tools["quick"]:
if check_tool_available(tool):
return [tool]
return []
else: # heavy
# Return all available tools
return [t for t in tools["heavy"] if check_tool_available(t)]
def convert_with_pymupdf4llm(
file_path: Path, assets_dir: Optional[Path] = None
) -> ConversionResult:
"""Convert using PyMuPDF4LLM (best for PDFs)."""
try:
import pymupdf4llm
kwargs = {}
images = []
if assets_dir:
assets_dir.mkdir(parents=True, exist_ok=True)
kwargs["write_images"] = True
kwargs["image_path"] = str(assets_dir)
kwargs["dpi"] = 150
# Use best table detection strategy
kwargs["table_strategy"] = "lines_strict"
md_text = pymupdf4llm.to_markdown(str(file_path), **kwargs)
# Collect extracted images
if assets_dir and assets_dir.exists():
images = [str(p) for p in assets_dir.glob("*.png")]
images.extend([str(p) for p in assets_dir.glob("*.jpg")])
return ConversionResult(
markdown=md_text, tool="pymupdf4llm", images=images, success=True
)
except Exception as e:
return ConversionResult(
markdown="", tool="pymupdf4llm", success=False, error=str(e)
)
def convert_with_markitdown(
file_path: Path, assets_dir: Optional[Path] = None
) -> ConversionResult:
"""Convert using markitdown."""
try:
# markitdown CLI approach
result = subprocess.run(
["markitdown", str(file_path)],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode != 0:
return ConversionResult(
markdown="",
tool="markitdown",
success=False,
error=result.stderr,
)
return ConversionResult(
markdown=result.stdout, tool="markitdown", success=True
)
except FileNotFoundError:
# Try Python API
try:
from markitdown import MarkItDown
md = MarkItDown()
result = md.convert(str(file_path))
return ConversionResult(
markdown=result.text_content, tool="markitdown", success=True
)
except Exception as e:
return ConversionResult(
markdown="", tool="markitdown", success=False, error=str(e)
)
except Exception as e:
return ConversionResult(
markdown="", tool="markitdown", success=False, error=str(e)
)
def convert_with_pandoc(
file_path: Path, assets_dir: Optional[Path] = None
) -> ConversionResult:
"""Convert using pandoc."""
try:
cmd = ["pandoc", str(file_path), "-t", "markdown", "--wrap=none"]
if assets_dir:
assets_dir.mkdir(parents=True, exist_ok=True)
cmd.extend(["--extract-media", str(assets_dir)])
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=120
)
if result.returncode != 0:
return ConversionResult(
markdown="", tool="pandoc", success=False, error=result.stderr
)
images = []
if assets_dir and assets_dir.exists():
images = [str(p) for p in assets_dir.rglob("*.png")]
images.extend([str(p) for p in assets_dir.rglob("*.jpg")])
return ConversionResult(
markdown=result.stdout, tool="pandoc", images=images, success=True
)
except Exception as e:
return ConversionResult(
markdown="", tool="pandoc", success=False, error=str(e)
)
def convert_single(
file_path: Path, tool: str, assets_dir: Optional[Path] = None
) -> ConversionResult:
"""Run a single conversion tool."""
converters = {
"pymupdf4llm": convert_with_pymupdf4llm,
"markitdown": convert_with_markitdown,
"pandoc": convert_with_pandoc,
}
converter = converters.get(tool)
if not converter:
return ConversionResult(
markdown="", tool=tool, success=False, error=f"Unknown tool: {tool}"
)
return converter(file_path, assets_dir)
def merge_results(results: list[ConversionResult]) -> ConversionResult:
"""Merge results from multiple tools, selecting best segments."""
if not results:
return ConversionResult(markdown="", tool="none", success=False)
# Filter successful results
successful = [r for r in results if r.success and r.markdown.strip()]
if not successful:
# Return first error
return results[0] if results else ConversionResult(
markdown="", tool="none", success=False
)
if len(successful) == 1:
return successful[0]
# Multiple successful results - merge them
# Strategy: Compare key metrics and select best
best = successful[0]
best_score = score_markdown(best.markdown)
for result in successful[1:]:
score = score_markdown(result.markdown)
if score > best_score:
best = result
best_score = score
# Merge images from all results
all_images = []
seen = set()
for result in successful:
for img in result.images:
if img not in seen:
all_images.append(img)
seen.add(img)
best.images = all_images
best.tool = f"merged({','.join(r.tool for r in successful)})"
return best
def score_markdown(md: str) -> float:
"""Score markdown quality for comparison."""
score = 0.0
# Length (more content is generally better)
score += min(len(md) / 10000, 5.0) # Cap at 5 points
# Tables (proper markdown tables)
table_count = md.count("|---|") + md.count("| ---")
score += min(table_count * 0.5, 3.0)
# Images (referenced images)
image_count = md.count("![")
score += min(image_count * 0.3, 2.0)
# Headings (proper hierarchy)
h1_count = md.count("\n# ")
h2_count = md.count("\n## ")
h3_count = md.count("\n### ")
if h1_count > 0 and h2_count >= h1_count:
score += 1.0 # Good hierarchy
# Lists (structured content)
list_count = md.count("\n- ") + md.count("\n* ") + md.count("\n1. ")
score += min(list_count * 0.1, 2.0)
return score
def main():
parser = argparse.ArgumentParser(
description="Convert documents to markdown with multi-tool orchestration",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Quick mode (default)
python convert.py document.pdf -o output.md
# Heavy mode (best quality)
python convert.py document.pdf -o output.md --heavy
# With custom assets directory
python convert.py document.pdf -o output.md --assets-dir ./images
""",
)
parser.add_argument("input", type=Path, help="Input document path")
parser.add_argument(
"-o", "--output", type=Path, help="Output markdown file"
)
parser.add_argument(
"--heavy",
action="store_true",
help="Enable Heavy Mode (multi-tool, best quality)",
)
parser.add_argument(
"--assets-dir",
type=Path,
default=None,
help="Directory for extracted images (default: <output>_assets/)",
)
parser.add_argument(
"--tool",
choices=["pymupdf4llm", "markitdown", "pandoc"],
help="Force specific tool (overrides auto-selection)",
)
parser.add_argument(
"--list-tools",
action="store_true",
help="List available tools and exit",
)
args = parser.parse_args()
# List tools mode
if args.list_tools:
tools = ["pymupdf4llm", "markitdown", "pandoc"]
print("Available conversion tools:")
for tool in tools:
status = "" if check_tool_available(tool) else ""
print(f" {status} {tool}")
sys.exit(0)
# Validate input
if not args.input.exists():
print(f"Error: Input file not found: {args.input}", file=sys.stderr)
sys.exit(1)
# Determine output path
output_path = args.output or args.input.with_suffix(".md")
# Determine assets directory
assets_dir = args.assets_dir
if assets_dir is None and args.heavy:
assets_dir = output_path.parent / f"{output_path.stem}_assets"
# Select tools
mode = "heavy" if args.heavy else "quick"
if args.tool:
tools = [args.tool] if check_tool_available(args.tool) else []
else:
tools = select_tools(args.input, mode)
if not tools:
print("Error: No conversion tools available.", file=sys.stderr)
print("Install with:", file=sys.stderr)
print(" pip install pymupdf4llm", file=sys.stderr)
print(" uv tool install markitdown[pdf]", file=sys.stderr)
print(" brew install pandoc", file=sys.stderr)
sys.exit(1)
print(f"Converting: {args.input}")
print(f"Mode: {mode.upper()}")
print(f"Tools: {', '.join(tools)}")
# Run conversions
results = []
for tool in tools:
print(f" Running {tool}...", end=" ", flush=True)
# Use separate assets dirs for each tool in heavy mode
tool_assets = None
if assets_dir and mode == "heavy" and len(tools) > 1:
tool_assets = assets_dir / tool
elif assets_dir:
tool_assets = assets_dir
result = convert_single(args.input, tool, tool_assets)
results.append(result)
if result.success:
print(f"✓ ({len(result.markdown):,} chars, {len(result.images)} images)")
else:
print(f"✗ ({result.error[:50]}...)")
# Merge results if heavy mode
if mode == "heavy" and len(results) > 1:
print(" Merging results...", end=" ", flush=True)
final = merge_results(results)
print(f"✓ (using {final.tool})")
else:
final = merge_results(results)
if not final.success:
print(f"Error: Conversion failed: {final.error}", file=sys.stderr)
sys.exit(1)
# Write output
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(final.markdown)
print(f"\nOutput: {output_path}")
print(f" Size: {len(final.markdown):,} characters")
if final.images:
print(f" Images: {len(final.images)} extracted")
if __name__ == "__main__":
main()

0
markdown-tools/scripts/convert_path.py Normal file → Executable file
View File

232
markdown-tools/scripts/extract_pdf_images.py Normal file → Executable file
View File

@@ -1,94 +1,242 @@
#!/usr/bin/env python3
"""
Extract images from PDF files using PyMuPDF.
Extract images from PDF files with metadata using PyMuPDF.
Features:
- Extracts all images with page and position metadata
- Generates JSON metadata file for each image
- Supports markdown reference generation
- Optional DPI control for quality
Usage:
uv run --with pymupdf python extract_pdf_images.py <pdf_path> [output_dir]
uv run --with pymupdf scripts/extract_pdf_images.py document.pdf
uv run --with pymupdf scripts/extract_pdf_images.py document.pdf -o ./images
uv run --with pymupdf scripts/extract_pdf_images.py document.pdf --markdown refs.md
Examples:
uv run --with pymupdf python extract_pdf_images.py document.pdf
uv run --with pymupdf python extract_pdf_images.py document.pdf ./assets
# Basic extraction
uv run --with pymupdf scripts/extract_pdf_images.py document.pdf
Output:
Images are saved to output_dir (default: ./assets) with names like:
- img_page1_1.png
- img_page2_1.png
# With custom output and markdown references
uv run --with pymupdf scripts/extract_pdf_images.py doc.pdf -o assets --markdown images.md
"""
import argparse
import json
import sys
import os
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
def extract_images(pdf_path: str, output_dir: str = "assets") -> list[str]:
@dataclass
class ImageMetadata:
"""Metadata for an extracted image."""
filename: str
page: int # 1-indexed
index: int # Image index on page (1-indexed)
width: int # Original width in pixels
height: int # Original height in pixels
x: float # X position on page (points)
y: float # Y position on page (points)
bbox_width: float # Width on page (points)
bbox_height: float # Height on page (points)
size_bytes: int
format: str # png, jpg, etc.
colorspace: str # RGB, CMYK, Gray
bits_per_component: int
def extract_images(
pdf_path: Path,
output_dir: Path,
markdown_file: Optional[Path] = None
) -> list[ImageMetadata]:
"""
Extract all images from a PDF file.
Extract all images from a PDF file with metadata.
Args:
pdf_path: Path to the PDF file
output_dir: Directory to save extracted images
markdown_file: Optional path to write markdown references
Returns:
List of extracted image file paths
List of ImageMetadata for each extracted image
"""
try:
import fitz # PyMuPDF
except ImportError:
print("Error: PyMuPDF not installed. Run with:")
print(' uv run --with pymupdf python extract_pdf_images.py <pdf_path>')
print(' uv run --with pymupdf scripts/extract_pdf_images.py <pdf_path>')
sys.exit(1)
os.makedirs(output_dir, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
doc = fitz.open(pdf_path)
extracted_files = []
doc = fitz.open(str(pdf_path))
extracted: list[ImageMetadata] = []
markdown_refs: list[str] = []
for page_num in range(len(doc)):
page = doc[page_num]
image_list = page.get_images()
image_list = page.get_images(full=True)
for img_index, img_info in enumerate(image_list):
xref = img_info[0]
try:
base_image = doc.extract_image(xref)
except Exception as e:
print(f" Warning: Could not extract image xref={xref}: {e}")
continue
for img_index, img in enumerate(image_list):
xref = img[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"]
width = base_image.get("width", 0)
height = base_image.get("height", 0)
colorspace = base_image.get("colorspace", 0)
bpc = base_image.get("bpc", 8)
# Map colorspace number to name
cs_names = {1: "Gray", 3: "RGB", 4: "CMYK"}
cs_name = cs_names.get(colorspace, f"Unknown({colorspace})")
# Get image position on page
# img_info: (xref, smask, width, height, bpc, colorspace, alt, name, filter, referencer)
# We need to find the image rect on page
bbox_x, bbox_y, bbox_w, bbox_h = 0.0, 0.0, 0.0, 0.0
# Search for image instances on page
for img_block in page.get_images():
if img_block[0] == xref:
# Found matching image, try to get its rect
rects = page.get_image_rects(img_block)
if rects:
rect = rects[0] # Use first occurrence
bbox_x = rect.x0
bbox_y = rect.y0
bbox_w = rect.width
bbox_h = rect.height
break
# Create descriptive filename
img_filename = f"img_page{page_num + 1}_{img_index + 1}.{image_ext}"
img_path = os.path.join(output_dir, img_filename)
img_path = output_dir / img_filename
# Save image
with open(img_path, "wb") as f:
f.write(image_bytes)
extracted_files.append(img_path)
print(f"Extracted: {img_filename} ({len(image_bytes):,} bytes)")
# Create metadata
metadata = ImageMetadata(
filename=img_filename,
page=page_num + 1,
index=img_index + 1,
width=width,
height=height,
x=round(bbox_x, 2),
y=round(bbox_y, 2),
bbox_width=round(bbox_w, 2),
bbox_height=round(bbox_h, 2),
size_bytes=len(image_bytes),
format=image_ext,
colorspace=cs_name,
bits_per_component=bpc
)
extracted.append(metadata)
# Generate markdown reference
alt_text = f"Image from page {page_num + 1}"
md_ref = f"![{alt_text}]({img_path.name})"
markdown_refs.append(f"<!-- Page {page_num + 1}, Position: ({bbox_x:.0f}, {bbox_y:.0f}) -->\n{md_ref}")
print(f"{img_filename} ({width}x{height}, {len(image_bytes):,} bytes)")
doc.close()
print(f"\nTotal: {len(extracted_files)} images extracted to {output_dir}/")
return extracted_files
# Write metadata JSON
metadata_path = output_dir / "images_metadata.json"
with open(metadata_path, "w") as f:
json.dump(
{
"source": str(pdf_path),
"image_count": len(extracted),
"images": [asdict(m) for m in extracted]
},
f,
indent=2
)
print(f"\n📋 Metadata: {metadata_path}")
# Write markdown references if requested
if markdown_file and markdown_refs:
markdown_content = f"# Images from {pdf_path.name}\n\n"
markdown_content += "\n\n".join(markdown_refs)
markdown_file.parent.mkdir(parents=True, exist_ok=True)
markdown_file.write_text(markdown_content)
print(f"📝 Markdown refs: {markdown_file}")
print(f"\n✅ Total: {len(extracted)} images extracted to {output_dir}/")
return extracted
def main():
if len(sys.argv) < 2 or sys.argv[1] in ("-h", "--help"):
print("Extract images from PDF files using PyMuPDF.")
print()
print("Usage: python extract_pdf_images.py <pdf_path> [output_dir]")
print()
print("Arguments:")
print(" pdf_path Path to the PDF file")
print(" output_dir Directory to save images (default: ./assets)")
print()
print("Example:")
print(" uv run --with pymupdf python extract_pdf_images.py document.pdf ./assets")
sys.exit(0 if "--help" in sys.argv or "-h" in sys.argv else 1)
parser = argparse.ArgumentParser(
description="Extract images from PDF files with metadata",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic extraction
uv run --with pymupdf scripts/extract_pdf_images.py document.pdf
pdf_path = sys.argv[1]
output_dir = sys.argv[2] if len(sys.argv) > 2 else "assets"
# Custom output directory
uv run --with pymupdf scripts/extract_pdf_images.py doc.pdf -o ./images
if not os.path.exists(pdf_path):
print(f"Error: File not found: {pdf_path}")
# With markdown references
uv run --with pymupdf scripts/extract_pdf_images.py doc.pdf --markdown refs.md
Output:
Images are saved with descriptive names: img_page1_1.png, img_page2_1.jpg
Metadata is saved to: images_metadata.json
"""
)
parser.add_argument(
"pdf_path",
type=Path,
help="Path to the PDF file"
)
parser.add_argument(
"-o", "--output",
type=Path,
default=Path("assets"),
help="Directory to save images (default: ./assets)"
)
parser.add_argument(
"--markdown",
type=Path,
help="Generate markdown file with image references"
)
parser.add_argument(
"--json",
action="store_true",
help="Output metadata as JSON to stdout"
)
args = parser.parse_args()
if not args.pdf_path.exists():
print(f"Error: File not found: {args.pdf_path}", file=sys.stderr)
sys.exit(1)
extract_images(pdf_path, output_dir)
print(f"📄 Extracting images from: {args.pdf_path}")
extracted = extract_images(
args.pdf_path,
args.output,
args.markdown
)
if args.json:
print(json.dumps([asdict(m) for m in extracted], indent=2))
if __name__ == "__main__":

View File

@@ -0,0 +1,439 @@
#!/usr/bin/env python3
"""
Multi-tool markdown output merger with segment-level comparison.
Merges markdown outputs from multiple conversion tools by selecting
the best version of each segment (tables, images, headings, paragraphs).
Usage:
python merge_outputs.py output1.md output2.md -o merged.md
python merge_outputs.py --from-json results.json -o merged.md
"""
import argparse
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class Segment:
"""A segment of markdown content."""
type: str # 'heading', 'table', 'image', 'list', 'paragraph', 'code'
content: str
level: int = 0 # For headings
score: float = 0.0
@dataclass
class MergeResult:
"""Result from merging multiple markdown files."""
markdown: str
sources: list[str] = field(default_factory=list)
segment_sources: dict = field(default_factory=dict) # segment_idx -> source
def parse_segments(markdown: str) -> list[Segment]:
"""Parse markdown into typed segments."""
segments = []
lines = markdown.split('\n')
current_segment = []
current_type = 'paragraph'
current_level = 0
in_code_block = False
in_table = False
def flush_segment():
nonlocal current_segment, current_type, current_level
if current_segment:
content = '\n'.join(current_segment).strip()
if content:
segments.append(Segment(
type=current_type,
content=content,
level=current_level
))
current_segment = []
current_type = 'paragraph'
current_level = 0
for line in lines:
# Code block detection
if line.startswith('```'):
if in_code_block:
current_segment.append(line)
flush_segment()
in_code_block = False
continue
else:
flush_segment()
in_code_block = True
current_type = 'code'
current_segment.append(line)
continue
if in_code_block:
current_segment.append(line)
continue
# Heading detection
heading_match = re.match(r'^(#{1,6})\s+(.+)$', line)
if heading_match:
flush_segment()
current_type = 'heading'
current_level = len(heading_match.group(1))
current_segment.append(line)
flush_segment()
continue
# Table detection
if '|' in line and re.match(r'^\s*\|.*\|\s*$', line):
if not in_table:
flush_segment()
in_table = True
current_type = 'table'
current_segment.append(line)
continue
elif in_table:
flush_segment()
in_table = False
# Image detection
if re.match(r'!\[.*\]\(.*\)', line):
flush_segment()
current_type = 'image'
current_segment.append(line)
flush_segment()
continue
# List detection
if re.match(r'^[\s]*[-*+]\s+', line) or re.match(r'^[\s]*\d+\.\s+', line):
if current_type != 'list':
flush_segment()
current_type = 'list'
current_segment.append(line)
continue
elif current_type == 'list' and line.strip() == '':
flush_segment()
continue
# Empty line - potential paragraph break
if line.strip() == '':
if current_type == 'paragraph' and current_segment:
flush_segment()
continue
# Default: paragraph
if current_type not in ['list']:
current_type = 'paragraph'
current_segment.append(line)
flush_segment()
return segments
def score_segment(segment: Segment) -> float:
"""Score a segment for quality comparison."""
score = 0.0
content = segment.content
if segment.type == 'table':
# Count rows and columns
rows = [l for l in content.split('\n') if '|' in l]
if rows:
cols = rows[0].count('|') - 1
score += len(rows) * 0.5 # More rows = better
score += cols * 0.3 # More columns = better
# Penalize separator-only tables
if all(re.match(r'^[\s|:-]+$', r) for r in rows):
score -= 5.0
# Bonus for proper header separator
if len(rows) > 1 and re.match(r'^[\s|:-]+$', rows[1]):
score += 1.0
elif segment.type == 'heading':
# Prefer proper heading hierarchy
score += 1.0
# Penalize very long headings
if len(content) > 100:
score -= 0.5
elif segment.type == 'image':
# Prefer images with alt text
if re.search(r'!\[.+\]', content):
score += 1.0
# Prefer local paths over base64
if 'data:image' not in content:
score += 0.5
elif segment.type == 'list':
items = re.findall(r'^[\s]*[-*+\d.]+\s+', content, re.MULTILINE)
score += len(items) * 0.3
# Bonus for nested lists
if re.search(r'^\s{2,}[-*+]', content, re.MULTILINE):
score += 0.5
elif segment.type == 'code':
lines = content.split('\n')
score += min(len(lines) * 0.2, 3.0)
# Bonus for language specification
if re.match(r'^```\w+', content):
score += 0.5
else: # paragraph
words = len(content.split())
score += min(words * 0.05, 2.0)
# Penalize very short paragraphs
if words < 5:
score -= 0.5
return score
def find_matching_segment(
segment: Segment,
candidates: list[Segment],
used_indices: set
) -> Optional[int]:
"""Find a matching segment in candidates by type and similarity."""
best_match = None
best_similarity = 0.3 # Minimum threshold
for i, candidate in enumerate(candidates):
if i in used_indices:
continue
if candidate.type != segment.type:
continue
# Calculate similarity
if segment.type == 'heading':
# Compare heading text (ignore # symbols)
s1 = re.sub(r'^#+\s*', '', segment.content).lower()
s2 = re.sub(r'^#+\s*', '', candidate.content).lower()
similarity = _text_similarity(s1, s2)
elif segment.type == 'table':
# Compare first row (header)
h1 = segment.content.split('\n')[0] if segment.content else ''
h2 = candidate.content.split('\n')[0] if candidate.content else ''
similarity = _text_similarity(h1, h2)
else:
# Compare content directly
similarity = _text_similarity(segment.content, candidate.content)
if similarity > best_similarity:
best_similarity = similarity
best_match = i
return best_match
def _text_similarity(s1: str, s2: str) -> float:
"""Calculate simple text similarity (Jaccard on words)."""
if not s1 or not s2:
return 0.0
words1 = set(s1.lower().split())
words2 = set(s2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union if union > 0 else 0.0
def merge_markdown_files(
files: list[Path],
source_names: Optional[list[str]] = None
) -> MergeResult:
"""Merge multiple markdown files by selecting best segments."""
if not files:
return MergeResult(markdown="", sources=[])
if source_names is None:
source_names = [f.stem for f in files]
# Parse all files into segments
all_segments = []
for i, file_path in enumerate(files):
content = file_path.read_text()
segments = parse_segments(content)
# Score each segment
for seg in segments:
seg.score = score_segment(seg)
all_segments.append((source_names[i], segments))
if len(all_segments) == 1:
return MergeResult(
markdown=files[0].read_text(),
sources=[source_names[0]]
)
# Use first file as base structure
base_name, base_segments = all_segments[0]
merged_segments = []
segment_sources = {}
for i, base_seg in enumerate(base_segments):
best_segment = base_seg
best_source = base_name
# Find matching segments in other files
for other_name, other_segments in all_segments[1:]:
used = set()
match_idx = find_matching_segment(base_seg, other_segments, used)
if match_idx is not None:
other_seg = other_segments[match_idx]
if other_seg.score > best_segment.score:
best_segment = other_seg
best_source = other_name
merged_segments.append(best_segment)
segment_sources[i] = best_source
# Check for segments in other files that weren't matched
# (content that only appears in secondary sources)
base_used = set(range(len(base_segments)))
for other_name, other_segments in all_segments[1:]:
for j, other_seg in enumerate(other_segments):
match_idx = find_matching_segment(other_seg, base_segments, set())
if match_idx is None and other_seg.score > 0.5:
# This segment doesn't exist in base - consider adding
merged_segments.append(other_seg)
segment_sources[len(merged_segments) - 1] = other_name
# Reconstruct markdown
merged_md = '\n\n'.join(seg.content for seg in merged_segments)
return MergeResult(
markdown=merged_md,
sources=source_names,
segment_sources=segment_sources
)
def merge_from_json(json_path: Path) -> MergeResult:
"""Merge from JSON results file (from convert.py)."""
with open(json_path) as f:
data = json.load(f)
results = data.get('results', [])
if not results:
return MergeResult(markdown="", sources=[])
# Filter successful results
successful = [r for r in results if r.get('success') and r.get('markdown')]
if not successful:
return MergeResult(markdown="", sources=[])
if len(successful) == 1:
return MergeResult(
markdown=successful[0]['markdown'],
sources=[successful[0]['tool']]
)
# Parse and merge
all_segments = []
for result in successful:
tool = result['tool']
segments = parse_segments(result['markdown'])
for seg in segments:
seg.score = score_segment(seg)
all_segments.append((tool, segments))
# Same merge logic as merge_markdown_files
base_name, base_segments = all_segments[0]
merged_segments = []
segment_sources = {}
for i, base_seg in enumerate(base_segments):
best_segment = base_seg
best_source = base_name
for other_name, other_segments in all_segments[1:]:
match_idx = find_matching_segment(base_seg, other_segments, set())
if match_idx is not None:
other_seg = other_segments[match_idx]
if other_seg.score > best_segment.score:
best_segment = other_seg
best_source = other_name
merged_segments.append(best_segment)
segment_sources[i] = best_source
merged_md = '\n\n'.join(seg.content for seg in merged_segments)
return MergeResult(
markdown=merged_md,
sources=[r['tool'] for r in successful],
segment_sources=segment_sources
)
def main():
parser = argparse.ArgumentParser(
description="Merge markdown outputs from multiple conversion tools"
)
parser.add_argument(
"inputs",
nargs="*",
type=Path,
help="Input markdown files to merge"
)
parser.add_argument(
"-o", "--output",
type=Path,
help="Output merged markdown file"
)
parser.add_argument(
"--from-json",
type=Path,
help="Merge from JSON results file (from convert.py)"
)
parser.add_argument(
"--verbose",
action="store_true",
help="Show segment source attribution"
)
args = parser.parse_args()
if args.from_json:
result = merge_from_json(args.from_json)
elif args.inputs:
# Validate inputs
for f in args.inputs:
if not f.exists():
print(f"Error: File not found: {f}", file=sys.stderr)
sys.exit(1)
result = merge_markdown_files(args.inputs)
else:
parser.error("Either input files or --from-json is required")
if not result.markdown:
print("Error: No content to merge", file=sys.stderr)
sys.exit(1)
# Output
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(result.markdown)
print(f"Merged output: {args.output}")
print(f"Sources: {', '.join(result.sources)}")
else:
print(result.markdown)
if args.verbose and result.segment_sources:
print("\n--- Segment Attribution ---", file=sys.stderr)
for idx, source in result.segment_sources.items():
print(f" Segment {idx}: {source}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,466 @@
#!/usr/bin/env python3
"""
Quality validator for document-to-markdown conversion.
Compare original document with converted markdown to assess conversion quality.
Generates HTML quality report with detailed metrics.
Usage:
uv run --with pymupdf scripts/validate_output.py document.pdf output.md
uv run --with pymupdf scripts/validate_output.py document.pdf output.md --report report.html
"""
import argparse
import html
import re
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class ValidationMetrics:
"""Quality metrics for conversion validation."""
# Text metrics
source_char_count: int = 0
output_char_count: int = 0
text_retention: float = 0.0
# Table metrics
source_table_count: int = 0
output_table_count: int = 0
table_retention: float = 0.0
# Image metrics
source_image_count: int = 0
output_image_count: int = 0
image_retention: float = 0.0
# Structure metrics
heading_count: int = 0
list_count: int = 0
code_block_count: int = 0
# Quality scores
overall_score: float = 0.0
status: str = "unknown" # pass, warn, fail
# Details
warnings: list[str] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
def extract_text_from_pdf(pdf_path: Path) -> tuple[str, int, int]:
"""Extract text, table count, and image count from PDF."""
try:
import fitz # PyMuPDF
doc = fitz.open(str(pdf_path))
text_parts = []
table_count = 0
image_count = 0
for page in doc:
text_parts.append(page.get_text())
# Count images
image_count += len(page.get_images())
# Estimate tables (look for grid-like structures)
# This is approximate - tables are hard to detect in PDFs
page_text = page.get_text()
if re.search(r'(\t.*){2,}', page_text) or '' in page_text:
table_count += 1
doc.close()
return '\n'.join(text_parts), table_count, image_count
except ImportError:
# Fallback to pdftotext if available
try:
result = subprocess.run(
['pdftotext', '-layout', str(pdf_path), '-'],
capture_output=True,
text=True,
timeout=60
)
return result.stdout, 0, 0 # Can't count tables/images
except Exception:
return "", 0, 0
def extract_text_from_docx(docx_path: Path) -> tuple[str, int, int]:
"""Extract text, table count, and image count from DOCX."""
try:
import zipfile
from xml.etree import ElementTree as ET
with zipfile.ZipFile(docx_path, 'r') as z:
# Extract main document text
if 'word/document.xml' not in z.namelist():
return "", 0, 0
with z.open('word/document.xml') as f:
tree = ET.parse(f)
root = tree.getroot()
# Extract text
ns = {'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}
text_parts = []
for t in root.iter('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}t'):
if t.text:
text_parts.append(t.text)
# Count tables
tables = root.findall('.//w:tbl', ns)
table_count = len(tables)
# Count images
image_count = sum(1 for name in z.namelist()
if name.startswith('word/media/'))
return ' '.join(text_parts), table_count, image_count
except Exception as e:
return "", 0, 0
def analyze_markdown(md_path: Path) -> dict:
"""Analyze markdown file structure and content."""
content = md_path.read_text()
# Count tables (markdown tables with |)
table_lines = [l for l in content.split('\n')
if re.match(r'^\s*\|.*\|', l)]
# Group consecutive table lines
table_count = 0
in_table = False
for line in content.split('\n'):
if re.match(r'^\s*\|.*\|', line):
if not in_table:
table_count += 1
in_table = True
else:
in_table = False
# Count images
images = re.findall(r'!\[.*?\]\(.*?\)', content)
# Count headings
headings = re.findall(r'^#{1,6}\s+.+$', content, re.MULTILINE)
# Count lists
list_items = re.findall(r'^[\s]*[-*+]\s+', content, re.MULTILINE)
list_items += re.findall(r'^[\s]*\d+\.\s+', content, re.MULTILINE)
# Count code blocks
code_blocks = re.findall(r'```', content)
# Clean text for comparison
clean_text = re.sub(r'```.*?```', '', content, flags=re.DOTALL)
clean_text = re.sub(r'!\[.*?\]\(.*?\)', '', clean_text)
clean_text = re.sub(r'\[.*?\]\(.*?\)', '', clean_text)
clean_text = re.sub(r'[#*_`|>-]', '', clean_text)
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
return {
'char_count': len(clean_text),
'table_count': table_count,
'image_count': len(images),
'heading_count': len(headings),
'list_count': len(list_items),
'code_block_count': len(code_blocks) // 2,
'raw_content': content,
'clean_text': clean_text
}
def validate_conversion(
source_path: Path,
output_path: Path
) -> ValidationMetrics:
"""Validate conversion quality by comparing source and output."""
metrics = ValidationMetrics()
# Analyze output markdown
md_analysis = analyze_markdown(output_path)
metrics.output_char_count = md_analysis['char_count']
metrics.output_table_count = md_analysis['table_count']
metrics.output_image_count = md_analysis['image_count']
metrics.heading_count = md_analysis['heading_count']
metrics.list_count = md_analysis['list_count']
metrics.code_block_count = md_analysis['code_block_count']
# Extract source content based on file type
ext = source_path.suffix.lower()
if ext == '.pdf':
source_text, source_tables, source_images = extract_text_from_pdf(source_path)
elif ext in ['.docx', '.doc']:
source_text, source_tables, source_images = extract_text_from_docx(source_path)
else:
# For other formats, estimate from file size
source_text = ""
source_tables = 0
source_images = 0
metrics.warnings.append(f"Cannot analyze source format: {ext}")
metrics.source_char_count = len(source_text.replace(' ', '').replace('\n', ''))
metrics.source_table_count = source_tables
metrics.source_image_count = source_images
# Calculate retention rates
if metrics.source_char_count > 0:
# Use ratio of actual/expected, capped at 1.0
metrics.text_retention = min(
metrics.output_char_count / metrics.source_char_count,
1.0
)
else:
metrics.text_retention = 1.0 if metrics.output_char_count > 0 else 0.0
if metrics.source_table_count > 0:
metrics.table_retention = min(
metrics.output_table_count / metrics.source_table_count,
1.0
)
else:
metrics.table_retention = 1.0 # No tables expected
if metrics.source_image_count > 0:
metrics.image_retention = min(
metrics.output_image_count / metrics.source_image_count,
1.0
)
else:
metrics.image_retention = 1.0 # No images expected
# Determine status based on thresholds
if metrics.text_retention < 0.85:
metrics.errors.append(f"Low text retention: {metrics.text_retention:.1%}")
elif metrics.text_retention < 0.95:
metrics.warnings.append(f"Text retention below optimal: {metrics.text_retention:.1%}")
if metrics.source_table_count > 0 and metrics.table_retention < 0.9:
metrics.errors.append(f"Tables missing: {metrics.table_retention:.1%} retained")
elif metrics.source_table_count > 0 and metrics.table_retention < 1.0:
metrics.warnings.append(f"Some tables may be incomplete: {metrics.table_retention:.1%}")
if metrics.source_image_count > 0 and metrics.image_retention < 0.8:
metrics.errors.append(f"Images missing: {metrics.image_retention:.1%} retained")
elif metrics.source_image_count > 0 and metrics.image_retention < 1.0:
metrics.warnings.append(f"Some images missing: {metrics.image_retention:.1%}")
# Calculate overall score (0-100)
metrics.overall_score = (
metrics.text_retention * 50 +
metrics.table_retention * 25 +
metrics.image_retention * 25
) * 100
# Determine status
if metrics.errors:
metrics.status = "fail"
elif metrics.warnings:
metrics.status = "warn"
else:
metrics.status = "pass"
return metrics
def generate_html_report(
metrics: ValidationMetrics,
source_path: Path,
output_path: Path
) -> str:
"""Generate HTML quality report."""
status_colors = {
"pass": "#28a745",
"warn": "#ffc107",
"fail": "#dc3545"
}
status_color = status_colors.get(metrics.status, "#6c757d")
def metric_bar(value: float, thresholds: tuple) -> str:
"""Generate colored progress bar."""
pct = int(value * 100)
if value >= thresholds[0]:
color = "#28a745" # green
elif value >= thresholds[1]:
color = "#ffc107" # yellow
else:
color = "#dc3545" # red
return f'''
<div style="background: #e9ecef; border-radius: 4px; overflow: hidden; height: 20px;">
<div style="background: {color}; width: {pct}%; height: 100%; transition: width 0.3s;"></div>
</div>
<span style="font-size: 14px; color: #666;">{pct}%</span>
'''
report = f'''<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Conversion Quality Report</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif; margin: 40px; background: #f5f5f5; }}
.container {{ max-width: 800px; margin: 0 auto; background: white; padding: 30px; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
h1 {{ color: #333; border-bottom: 2px solid #eee; padding-bottom: 15px; }}
.status {{ display: inline-block; padding: 8px 16px; border-radius: 4px; color: white; font-weight: bold; }}
.metric {{ margin: 20px 0; padding: 15px; background: #f8f9fa; border-radius: 4px; }}
.metric-label {{ font-weight: bold; color: #333; margin-bottom: 8px; }}
.metric-value {{ font-size: 24px; color: #333; }}
.issues {{ margin-top: 20px; }}
.error {{ background: #f8d7da; color: #721c24; padding: 10px; margin: 5px 0; border-radius: 4px; }}
.warning {{ background: #fff3cd; color: #856404; padding: 10px; margin: 5px 0; border-radius: 4px; }}
table {{ width: 100%; border-collapse: collapse; margin: 15px 0; }}
th, td {{ padding: 10px; text-align: left; border-bottom: 1px solid #eee; }}
th {{ background: #f8f9fa; }}
.score {{ font-size: 48px; font-weight: bold; color: {status_color}; }}
</style>
</head>
<body>
<div class="container">
<h1>📊 Conversion Quality Report</h1>
<div style="text-align: center; margin: 30px 0;">
<div class="score">{metrics.overall_score:.0f}</div>
<div style="color: #666;">Overall Score</div>
<div class="status" style="background: {status_color}; margin-top: 10px;">
{metrics.status.upper()}
</div>
</div>
<h2>📄 File Information</h2>
<table>
<tr><th>Source</th><td>{html.escape(str(source_path))}</td></tr>
<tr><th>Output</th><td>{html.escape(str(output_path))}</td></tr>
</table>
<h2>📏 Retention Metrics</h2>
<div class="metric">
<div class="metric-label">Text Retention (target: >95%)</div>
{metric_bar(metrics.text_retention, (0.95, 0.85))}
<div style="font-size: 12px; color: #666; margin-top: 5px;">
Source: ~{metrics.source_char_count:,} chars | Output: {metrics.output_char_count:,} chars
</div>
</div>
<div class="metric">
<div class="metric-label">Table Retention (target: 100%)</div>
{metric_bar(metrics.table_retention, (1.0, 0.9))}
<div style="font-size: 12px; color: #666; margin-top: 5px;">
Source: {metrics.source_table_count} tables | Output: {metrics.output_table_count} tables
</div>
</div>
<div class="metric">
<div class="metric-label">Image Retention (target: 100%)</div>
{metric_bar(metrics.image_retention, (1.0, 0.8))}
<div style="font-size: 12px; color: #666; margin-top: 5px;">
Source: {metrics.source_image_count} images | Output: {metrics.output_image_count} images
</div>
</div>
<h2>📊 Structure Analysis</h2>
<table>
<tr><th>Headings</th><td>{metrics.heading_count}</td></tr>
<tr><th>List Items</th><td>{metrics.list_count}</td></tr>
<tr><th>Code Blocks</th><td>{metrics.code_block_count}</td></tr>
</table>
{'<h2>⚠️ Issues</h2><div class="issues">' + ''.join(f'<div class="error">❌ {html.escape(e)}</div>' for e in metrics.errors) + ''.join(f'<div class="warning">⚠️ {html.escape(w)}</div>' for w in metrics.warnings) + '</div>' if metrics.errors or metrics.warnings else ''}
<div style="margin-top: 30px; padding-top: 20px; border-top: 1px solid #eee; color: #666; font-size: 12px;">
Generated by markdown-tools validate_output.py
</div>
</div>
</body>
</html>
'''
return report
def main():
parser = argparse.ArgumentParser(
description="Validate document-to-markdown conversion quality"
)
parser.add_argument(
"source",
type=Path,
help="Original document (PDF, DOCX, etc.)"
)
parser.add_argument(
"output",
type=Path,
help="Converted markdown file"
)
parser.add_argument(
"--report",
type=Path,
help="Generate HTML report at this path"
)
parser.add_argument(
"--json",
action="store_true",
help="Output metrics as JSON"
)
args = parser.parse_args()
# Validate inputs
if not args.source.exists():
print(f"Error: Source file not found: {args.source}", file=sys.stderr)
sys.exit(1)
if not args.output.exists():
print(f"Error: Output file not found: {args.output}", file=sys.stderr)
sys.exit(1)
# Run validation
metrics = validate_conversion(args.source, args.output)
# Output results
if args.json:
import json
print(json.dumps({
'text_retention': metrics.text_retention,
'table_retention': metrics.table_retention,
'image_retention': metrics.image_retention,
'overall_score': metrics.overall_score,
'status': metrics.status,
'warnings': metrics.warnings,
'errors': metrics.errors
}, indent=2))
else:
# Console output
status_emoji = {"pass": "", "warn": "⚠️", "fail": ""}.get(metrics.status, "")
print(f"\n{status_emoji} Conversion Quality: {metrics.status.upper()}")
print(f" Overall Score: {metrics.overall_score:.0f}/100")
print(f"\n Text Retention: {metrics.text_retention:.1%}")
print(f" Table Retention: {metrics.table_retention:.1%}")
print(f" Image Retention: {metrics.image_retention:.1%}")
if metrics.errors:
print("\n Errors:")
for e in metrics.errors:
print(f"{e}")
if metrics.warnings:
print("\n Warnings:")
for w in metrics.warnings:
print(f" ⚠️ {w}")
# Generate HTML report
if args.report:
report_html = generate_html_report(metrics, args.source, args.output)
args.report.parent.mkdir(parents=True, exist_ok=True)
args.report.write_text(report_html)
print(f"\n📊 HTML report: {args.report}")
# Exit with appropriate code
sys.exit(0 if metrics.status != "fail" else 1)
if __name__ == "__main__":
main()