Files
daymade 143995b213 refactor: rename markdown-tools → doc-to-markdown (v2.0.0)
- Rename skill to better reflect its purpose (document-to-markdown conversion)
- Update SKILL.md name, description, and trigger keywords
- Add benchmark reference (2026-03-22)
- Update marketplace.json entry (name, skills path, version 2.0.0)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-23 00:06:30 +08:00

440 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Multi-tool markdown output merger with segment-level comparison.
Merges markdown outputs from multiple conversion tools by selecting
the best version of each segment (tables, images, headings, paragraphs).
Usage:
python merge_outputs.py output1.md output2.md -o merged.md
python merge_outputs.py --from-json results.json -o merged.md
"""
import argparse
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class Segment:
"""A segment of markdown content."""
type: str # 'heading', 'table', 'image', 'list', 'paragraph', 'code'
content: str
level: int = 0 # For headings
score: float = 0.0
@dataclass
class MergeResult:
"""Result from merging multiple markdown files."""
markdown: str
sources: list[str] = field(default_factory=list)
segment_sources: dict = field(default_factory=dict) # segment_idx -> source
def parse_segments(markdown: str) -> list[Segment]:
"""Parse markdown into typed segments."""
segments = []
lines = markdown.split('\n')
current_segment = []
current_type = 'paragraph'
current_level = 0
in_code_block = False
in_table = False
def flush_segment():
nonlocal current_segment, current_type, current_level
if current_segment:
content = '\n'.join(current_segment).strip()
if content:
segments.append(Segment(
type=current_type,
content=content,
level=current_level
))
current_segment = []
current_type = 'paragraph'
current_level = 0
for line in lines:
# Code block detection
if line.startswith('```'):
if in_code_block:
current_segment.append(line)
flush_segment()
in_code_block = False
continue
else:
flush_segment()
in_code_block = True
current_type = 'code'
current_segment.append(line)
continue
if in_code_block:
current_segment.append(line)
continue
# Heading detection
heading_match = re.match(r'^(#{1,6})\s+(.+)$', line)
if heading_match:
flush_segment()
current_type = 'heading'
current_level = len(heading_match.group(1))
current_segment.append(line)
flush_segment()
continue
# Table detection
if '|' in line and re.match(r'^\s*\|.*\|\s*$', line):
if not in_table:
flush_segment()
in_table = True
current_type = 'table'
current_segment.append(line)
continue
elif in_table:
flush_segment()
in_table = False
# Image detection
if re.match(r'!\[.*\]\(.*\)', line):
flush_segment()
current_type = 'image'
current_segment.append(line)
flush_segment()
continue
# List detection
if re.match(r'^[\s]*[-*+]\s+', line) or re.match(r'^[\s]*\d+\.\s+', line):
if current_type != 'list':
flush_segment()
current_type = 'list'
current_segment.append(line)
continue
elif current_type == 'list' and line.strip() == '':
flush_segment()
continue
# Empty line - potential paragraph break
if line.strip() == '':
if current_type == 'paragraph' and current_segment:
flush_segment()
continue
# Default: paragraph
if current_type not in ['list']:
current_type = 'paragraph'
current_segment.append(line)
flush_segment()
return segments
def score_segment(segment: Segment) -> float:
"""Score a segment for quality comparison."""
score = 0.0
content = segment.content
if segment.type == 'table':
# Count rows and columns
rows = [l for l in content.split('\n') if '|' in l]
if rows:
cols = rows[0].count('|') - 1
score += len(rows) * 0.5 # More rows = better
score += cols * 0.3 # More columns = better
# Penalize separator-only tables
if all(re.match(r'^[\s|:-]+$', r) for r in rows):
score -= 5.0
# Bonus for proper header separator
if len(rows) > 1 and re.match(r'^[\s|:-]+$', rows[1]):
score += 1.0
elif segment.type == 'heading':
# Prefer proper heading hierarchy
score += 1.0
# Penalize very long headings
if len(content) > 100:
score -= 0.5
elif segment.type == 'image':
# Prefer images with alt text
if re.search(r'!\[.+\]', content):
score += 1.0
# Prefer local paths over base64
if 'data:image' not in content:
score += 0.5
elif segment.type == 'list':
items = re.findall(r'^[\s]*[-*+\d.]+\s+', content, re.MULTILINE)
score += len(items) * 0.3
# Bonus for nested lists
if re.search(r'^\s{2,}[-*+]', content, re.MULTILINE):
score += 0.5
elif segment.type == 'code':
lines = content.split('\n')
score += min(len(lines) * 0.2, 3.0)
# Bonus for language specification
if re.match(r'^```\w+', content):
score += 0.5
else: # paragraph
words = len(content.split())
score += min(words * 0.05, 2.0)
# Penalize very short paragraphs
if words < 5:
score -= 0.5
return score
def find_matching_segment(
segment: Segment,
candidates: list[Segment],
used_indices: set
) -> Optional[int]:
"""Find a matching segment in candidates by type and similarity."""
best_match = None
best_similarity = 0.3 # Minimum threshold
for i, candidate in enumerate(candidates):
if i in used_indices:
continue
if candidate.type != segment.type:
continue
# Calculate similarity
if segment.type == 'heading':
# Compare heading text (ignore # symbols)
s1 = re.sub(r'^#+\s*', '', segment.content).lower()
s2 = re.sub(r'^#+\s*', '', candidate.content).lower()
similarity = _text_similarity(s1, s2)
elif segment.type == 'table':
# Compare first row (header)
h1 = segment.content.split('\n')[0] if segment.content else ''
h2 = candidate.content.split('\n')[0] if candidate.content else ''
similarity = _text_similarity(h1, h2)
else:
# Compare content directly
similarity = _text_similarity(segment.content, candidate.content)
if similarity > best_similarity:
best_similarity = similarity
best_match = i
return best_match
def _text_similarity(s1: str, s2: str) -> float:
"""Calculate simple text similarity (Jaccard on words)."""
if not s1 or not s2:
return 0.0
words1 = set(s1.lower().split())
words2 = set(s2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1 & words2)
union = len(words1 | words2)
return intersection / union if union > 0 else 0.0
def merge_markdown_files(
files: list[Path],
source_names: Optional[list[str]] = None
) -> MergeResult:
"""Merge multiple markdown files by selecting best segments."""
if not files:
return MergeResult(markdown="", sources=[])
if source_names is None:
source_names = [f.stem for f in files]
# Parse all files into segments
all_segments = []
for i, file_path in enumerate(files):
content = file_path.read_text()
segments = parse_segments(content)
# Score each segment
for seg in segments:
seg.score = score_segment(seg)
all_segments.append((source_names[i], segments))
if len(all_segments) == 1:
return MergeResult(
markdown=files[0].read_text(),
sources=[source_names[0]]
)
# Use first file as base structure
base_name, base_segments = all_segments[0]
merged_segments = []
segment_sources = {}
for i, base_seg in enumerate(base_segments):
best_segment = base_seg
best_source = base_name
# Find matching segments in other files
for other_name, other_segments in all_segments[1:]:
used = set()
match_idx = find_matching_segment(base_seg, other_segments, used)
if match_idx is not None:
other_seg = other_segments[match_idx]
if other_seg.score > best_segment.score:
best_segment = other_seg
best_source = other_name
merged_segments.append(best_segment)
segment_sources[i] = best_source
# Check for segments in other files that weren't matched
# (content that only appears in secondary sources)
base_used = set(range(len(base_segments)))
for other_name, other_segments in all_segments[1:]:
for j, other_seg in enumerate(other_segments):
match_idx = find_matching_segment(other_seg, base_segments, set())
if match_idx is None and other_seg.score > 0.5:
# This segment doesn't exist in base - consider adding
merged_segments.append(other_seg)
segment_sources[len(merged_segments) - 1] = other_name
# Reconstruct markdown
merged_md = '\n\n'.join(seg.content for seg in merged_segments)
return MergeResult(
markdown=merged_md,
sources=source_names,
segment_sources=segment_sources
)
def merge_from_json(json_path: Path) -> MergeResult:
"""Merge from JSON results file (from convert.py)."""
with open(json_path) as f:
data = json.load(f)
results = data.get('results', [])
if not results:
return MergeResult(markdown="", sources=[])
# Filter successful results
successful = [r for r in results if r.get('success') and r.get('markdown')]
if not successful:
return MergeResult(markdown="", sources=[])
if len(successful) == 1:
return MergeResult(
markdown=successful[0]['markdown'],
sources=[successful[0]['tool']]
)
# Parse and merge
all_segments = []
for result in successful:
tool = result['tool']
segments = parse_segments(result['markdown'])
for seg in segments:
seg.score = score_segment(seg)
all_segments.append((tool, segments))
# Same merge logic as merge_markdown_files
base_name, base_segments = all_segments[0]
merged_segments = []
segment_sources = {}
for i, base_seg in enumerate(base_segments):
best_segment = base_seg
best_source = base_name
for other_name, other_segments in all_segments[1:]:
match_idx = find_matching_segment(base_seg, other_segments, set())
if match_idx is not None:
other_seg = other_segments[match_idx]
if other_seg.score > best_segment.score:
best_segment = other_seg
best_source = other_name
merged_segments.append(best_segment)
segment_sources[i] = best_source
merged_md = '\n\n'.join(seg.content for seg in merged_segments)
return MergeResult(
markdown=merged_md,
sources=[r['tool'] for r in successful],
segment_sources=segment_sources
)
def main():
parser = argparse.ArgumentParser(
description="Merge markdown outputs from multiple conversion tools"
)
parser.add_argument(
"inputs",
nargs="*",
type=Path,
help="Input markdown files to merge"
)
parser.add_argument(
"-o", "--output",
type=Path,
help="Output merged markdown file"
)
parser.add_argument(
"--from-json",
type=Path,
help="Merge from JSON results file (from convert.py)"
)
parser.add_argument(
"--verbose",
action="store_true",
help="Show segment source attribution"
)
args = parser.parse_args()
if args.from_json:
result = merge_from_json(args.from_json)
elif args.inputs:
# Validate inputs
for f in args.inputs:
if not f.exists():
print(f"Error: File not found: {f}", file=sys.stderr)
sys.exit(1)
result = merge_markdown_files(args.inputs)
else:
parser.error("Either input files or --from-json is required")
if not result.markdown:
print("Error: No content to merge", file=sys.stderr)
sys.exit(1)
# Output
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(result.markdown)
print(f"Merged output: {args.output}")
print(f"Sources: {', '.join(result.sources)}")
else:
print(result.markdown)
if args.verbose and result.segment_sources:
print("\n--- Segment Attribution ---", file=sys.stderr)
for idx, source in result.segment_sources.items():
print(f" Segment {idx}: {source}", file=sys.stderr)
if __name__ == "__main__":
main()