diff --git a/transcript-fixer/SKILL.md b/transcript-fixer/SKILL.md index 5785a47..e5f532d 100644 --- a/transcript-fixer/SKILL.md +++ b/transcript-fixer/SKILL.md @@ -142,6 +142,46 @@ Do **not** save one-off deletions, ambiguous context-only rewrites, or section-s See `references/iteration_workflow.md` for complete iteration guide with checklist. +## FALSE POSITIVE RISKS -- READ BEFORE ADDING CORRECTIONS + +Dictionary-based corrections are powerful but dangerous. Adding the wrong rule silently corrupts every future transcript. The `--add` command runs safety checks automatically, but you must understand the risks. + +### What is safe to add + +- **ASR-specific gibberish**: "巨升智能" -> "具身智能" (no real word sounds like "巨升智能") +- **Long compound errors**: "语音是别" -> "语音识别" (4+ chars, unlikely to collide) +- **English transliteration errors**: "japanese 3 pro" -> "Gemini 3 Pro" + +### What is NEVER safe to add + +- **Common Chinese words**: "仿佛", "正面", "犹豫", "传说", "增加", "教育" -- these appear correctly in normal text. Replacing them corrupts transcripts from better ASR models. +- **Words <=2 characters**: Almost any 2-char Chinese string is a valid word or part of one. "线数" inside "产线数据" becomes "产线束据". +- **Both sides are real words**: "仿佛->反复", "犹豫->抑郁" -- both forms are valid Chinese. The "error" is only an error for one specific ASR model. + +### When in doubt, use a context rule instead + +Context rules use regex patterns that match only in specific surroundings, avoiding false positives: +```bash +# Instead of: --add "线数" "线束" +# Use a context rule in the database: +sqlite3 ~/.transcript-fixer/corrections.db "INSERT INTO context_rules (pattern, replacement, description, priority) VALUES ('(?线束 (not inside 产线数据)', 10);" +``` + +### Auditing the dictionary + +Run `--audit` periodically to scan all rules for false positive risks: +```bash +uv run scripts/fix_transcription.py --audit +uv run scripts/fix_transcription.py --audit --domain manufacturing +``` + +### Forcing a risky addition + +If you understand the risks and still want to add a flagged rule: +```bash +uv run scripts/fix_transcription.py --add "仿佛" "反复" --domain general --force +``` + ## AI Fallback Strategy When GLM API is unavailable (503, network issues), the script outputs `[CLAUDE_FALLBACK]` marker. diff --git a/transcript-fixer/scripts/cli/__init__.py b/transcript-fixer/scripts/cli/__init__.py index 6e6e5b0..ca8d620 100644 --- a/transcript-fixer/scripts/cli/__init__.py +++ b/transcript-fixer/scripts/cli/__init__.py @@ -9,6 +9,7 @@ This module contains command handlers and argument parsing: from .commands import ( cmd_init, cmd_add_correction, + cmd_audit, cmd_list_corrections, cmd_run_correction, cmd_review_learned, @@ -25,6 +26,7 @@ from .argument_parser import create_argument_parser __all__ = [ 'cmd_init', 'cmd_add_correction', + 'cmd_audit', 'cmd_list_corrections', 'cmd_run_correction', 'cmd_review_learned', diff --git a/transcript-fixer/scripts/cli/argument_parser.py b/transcript-fixer/scripts/cli/argument_parser.py index a697e77..a418fd0 100644 --- a/transcript-fixer/scripts/cli/argument_parser.py +++ b/transcript-fixer/scripts/cli/argument_parser.py @@ -37,12 +37,24 @@ def create_argument_parser() -> argparse.ArgumentParser: dest="add_correction", help="Add correction" ) + parser.add_argument( + "--force", + action="store_true", + default=False, + help="Force --add even when safety checks detect risks (common word, substring collision)" + ) parser.add_argument( "--list", action="store_true", dest="list_corrections", help="List all corrections" ) + parser.add_argument( + "--audit", + action="store_true", + dest="audit_dictionary", + help="Audit all active corrections for false positive risks (common words, short text, substring collisions)" + ) # Correction workflow parser.add_argument( diff --git a/transcript-fixer/scripts/cli/commands.py b/transcript-fixer/scripts/cli/commands.py index 5b836b4..dbac655 100644 --- a/transcript-fixer/scripts/cli/commands.py +++ b/transcript-fixer/scripts/cli/commands.py @@ -43,16 +43,85 @@ def cmd_init(args: argparse.Namespace) -> None: def cmd_add_correction(args: argparse.Namespace) -> None: - """Add a single correction""" + """Add a single correction with safety checks""" service = _get_service() + force = getattr(args, 'force', False) try: - service.add_correction(args.from_text, args.to_text, args.domain) - print(f"✅ Added: '{args.from_text}' → '{args.to_text}' (domain: {args.domain})") + service.add_correction( + args.from_text, args.to_text, args.domain, force=force, + ) + print(f"Added: '{args.from_text}' -> '{args.to_text}' (domain: {args.domain})") except Exception as e: - print(f"❌ Error: {e}") + print(f"Error: {e}", file=sys.stderr) sys.exit(1) +def cmd_audit(args: argparse.Namespace) -> None: + """Audit all active corrections for false positive risks""" + service = _get_service() + domain = getattr(args, 'domain', None) + + print(f"\nAuditing corrections" + (f" (domain: {domain})" if domain else " (all domains)") + "...") + print("=" * 70) + + issues = service.audit_dictionary(domain) + + if not issues: + corrections = service.get_corrections(domain) + print(f"\nAll {len(corrections)} corrections passed safety checks.") + return + + # Categorize + error_count = 0 + warning_count = 0 + for from_text, warnings in issues.items(): + for w in warnings: + if w.level == "error": + error_count += 1 + else: + warning_count += 1 + + corrections = service.get_corrections(domain) + print(f"\nScanned {len(corrections)} corrections. " + f"Found issues in {len(issues)} rules:") + print(f" Errors: {error_count} (should be removed or converted to context rules)") + print(f" Warnings: {warning_count} (review recommended)") + print() + + # Print details grouped by severity + for severity in ["error", "warning"]: + label = "ERRORS" if severity == "error" else "WARNINGS" + relevant = { + ft: [w for w in ws if w.level == severity] + for ft, ws in issues.items() + } + relevant = {ft: ws for ft, ws in relevant.items() if ws} + + if not relevant: + continue + + print(f"--- {label} ({len(relevant)} rules) ---") + for from_text, warnings in sorted(relevant.items()): + to_text = corrections.get(from_text, "?") + print(f"\n '{from_text}' -> '{to_text}'") + for w in warnings: + print(f" [{w.category}] {w.message}") + print(f" Suggestion: {w.suggestion}") + print() + + if error_count > 0: + print( + f"ACTION REQUIRED: {error_count} error(s) found. These rules are " + f"actively causing false positives and should be removed or " + f"converted to context rules." + ) + print( + f"To remove a rule: " + f"sqlite3 ~/.transcript-fixer/corrections.db " + f"\"UPDATE corrections SET is_active=0 WHERE from_text='...';\"" + ) + + def cmd_list_corrections(args: argparse.Namespace) -> None: """List all corrections""" service = _get_service() diff --git a/transcript-fixer/scripts/core/correction_service.py b/transcript-fixer/scripts/core/correction_service.py index d8417a3..daf70cf 100644 --- a/transcript-fixer/scripts/core/correction_service.py +++ b/transcript-fixer/scripts/core/correction_service.py @@ -12,6 +12,7 @@ from __future__ import annotations import re import os +import sys import logging from pathlib import Path from typing import Dict, List, Optional, Tuple @@ -23,6 +24,10 @@ from .correction_repository import ( DatabaseError ) +# Import safety check for common words +sys.path.insert(0, str(Path(__file__).parent.parent)) +from utils.common_words import check_correction_safety, audit_corrections, SafetyWarning + logger = logging.getLogger(__name__) @@ -178,10 +183,15 @@ class CorrectionService: domain: str = "general", source: str = "manual", confidence: float = 1.0, - notes: Optional[str] = None + notes: Optional[str] = None, + force: bool = False, ) -> int: """ - Add a correction with full validation. + Add a correction with full validation and safety checks. + + Safety checks detect common Chinese words and substring collision + risks that would cause false positives. Pass force=True to bypass + (errors become warnings printed to stderr). Args: from_text: Original (incorrect) text @@ -190,12 +200,13 @@ class CorrectionService: source: Origin of correction confidence: Confidence score notes: Optional notes + force: If True, downgrade safety errors to warnings Returns: ID of inserted correction Raises: - ValidationError: If validation fails + ValidationError: If validation or safety check fails """ # Comprehensive validation self.validate_correction_text(from_text, "from_text") @@ -210,6 +221,34 @@ class CorrectionService: f"from_text and to_text are identical: '{from_text}'" ) + # Safety check: detect common words and substring collisions + safety_warnings = check_correction_safety(from_text, to_text, strict=True) + + if safety_warnings: + errors = [w for w in safety_warnings if w.level == "error"] + warns = [w for w in safety_warnings if w.level == "warning"] + + if errors and not force: + # Block the addition + msg_parts = [] + for w in errors: + msg_parts.append(f"[{w.category}] {w.message}") + msg_parts.append(f" Suggestion: {w.suggestion}") + raise ValidationError( + f"Safety check BLOCKED adding '{from_text}' -> '{to_text}':\n" + + "\n".join(msg_parts) + + "\n\nUse --force to override (at your own risk)." + ) + + # Print warnings (errors downgraded by --force, or genuine warnings) + all_to_print = errors + warns if force else warns + if all_to_print: + for w in all_to_print: + prefix = "FORCED" if w.level == "error" else "WARNING" + logger.warning( + f"[{prefix}] [{w.category}] {w.message} | {w.suggestion}" + ) + # Get current user added_by = os.getenv("USER") or os.getenv("USERNAME") or "unknown" @@ -431,6 +470,31 @@ class CorrectionService: return stats + # ==================== Audit Operations ==================== + + def audit_dictionary( + self, + domain: Optional[str] = None, + ) -> Dict[str, List[SafetyWarning]]: + """ + Audit all active corrections for safety issues. + + Scans every rule and flags: + - from_text that is a common Chinese word (false positive risk) + - from_text that is <= 2 characters (high collision risk) + - from_text that appears as substring in common words (collateral damage) + - Both from_text and to_text being common words (bidirectional risk) + + Args: + domain: Optional domain filter (None = all domains) + + Returns: + Dict mapping from_text to list of SafetyWarnings. + Only entries with issues are included. + """ + corrections = self.get_corrections(domain) + return audit_corrections(corrections) + # ==================== Helper Methods ==================== def _detect_conflicts( diff --git a/transcript-fixer/scripts/core/dictionary_processor.py b/transcript-fixer/scripts/core/dictionary_processor.py index 15a9586..6a1907e 100644 --- a/transcript-fixer/scripts/core/dictionary_processor.py +++ b/transcript-fixer/scripts/core/dictionary_processor.py @@ -14,9 +14,17 @@ Features: from __future__ import annotations import re +import sys +import logging +from pathlib import Path from typing import Dict, List, Tuple from dataclasses import dataclass +sys.path.insert(0, str(Path(__file__).parent.parent)) +from utils.common_words import ALL_COMMON_WORDS + +logger = logging.getLogger(__name__) + @dataclass class Change: @@ -96,7 +104,16 @@ class DictionaryProcessor: return corrected, changes def _apply_dictionary(self, text: str) -> Tuple[str, List[Change]]: - """Apply simple dictionary replacements""" + """ + Apply dictionary replacements with substring safety checks. + + Safety layers (applied in order at each match site): + 1. Superset check: if to_text already exists at the match position, + skip to prevent duplication (e.g., "金流"→"现金流" inside "现金流"). + This applies to ALL rules regardless of length. + 2. Boundary check (short rules only, <=3 chars): if the match is inside + a longer common word, skip to prevent collateral damage. + """ changes = [] corrected = text @@ -104,32 +121,167 @@ class DictionaryProcessor: if wrong not in corrected: continue - # Find all occurrences - occurrences = [] - start = 0 - while True: - pos = corrected.find(wrong, start) - if pos == -1: - break - line_num = corrected[:pos].count('\n') + 1 - occurrences.append(line_num) - start = pos + len(wrong) - - # Track changes - for line_num in occurrences: - changes.append(Change( - line_number=line_num, - from_text=wrong, - to_text=correct, - rule_type="dictionary", - rule_name="corrections_dict" - )) - - # Apply replacement - corrected = corrected.replace(wrong, correct) + # All rules go through position-aware replacement to get + # the superset check. Short rules additionally get the + # boundary check against common words. + needs_boundary_check = len(wrong) <= 3 + corrected, new_changes = self._apply_with_safety_checks( + corrected, wrong, correct, needs_boundary_check, + ) + changes.extend(new_changes) return corrected, changes + def _find_occurrences(self, text: str, target: str) -> List[int]: + """Find all line numbers where target appears in text.""" + occurrences = [] + start = 0 + while True: + pos = text.find(target, start) + if pos == -1: + break + line_num = text[:pos].count('\n') + 1 + occurrences.append(line_num) + start = pos + len(target) + return occurrences + + def _apply_with_safety_checks( + self, + text: str, + wrong: str, + correct: str, + check_boundaries: bool, + ) -> Tuple[str, List[Change]]: + """ + Apply replacement at each match position with two safety layers: + + 1. Superset check (all rules): When to_text contains from_text + (e.g., "金流"→"现金流"), check if the surrounding text already + forms to_text. If so, skip — the text is already correct. + + 2. Boundary check (short rules only): Check if the match is inside + a longer common word (e.g., "天差" inside "天差地别"). + """ + changes = [] + result_parts = [] + search_start = 0 + + while search_start < len(text): + pos = text.find(wrong, search_start) + if pos == -1: + result_parts.append(text[search_start:]) + break + + # Safety layer 1: superset check. + # If to_text contains from_text, the replacement could create + # duplication. Check if to_text already exists at this position. + if self._already_corrected(text, pos, wrong, correct): + result_parts.append(text[search_start:pos + len(wrong)]) + search_start = pos + len(wrong) + logger.debug( + f"Skipped '{wrong}' at pos {pos}: " + f"already corrected ('{correct}' present)" + ) + continue + + # Safety layer 2: boundary check (short rules only). + if check_boundaries and self._is_inside_longer_word( + text, pos, wrong + ): + result_parts.append(text[search_start:pos + len(wrong)]) + search_start = pos + len(wrong) + logger.debug( + f"Skipped '{wrong}' at pos {pos}: part of longer word" + ) + continue + + # Safe to replace + line_num = text[:pos].count('\n') + 1 + changes.append(Change( + line_number=line_num, + from_text=wrong, + to_text=correct, + rule_type="dictionary", + rule_name="corrections_dict" + )) + + result_parts.append(text[search_start:pos]) + result_parts.append(correct) + search_start = pos + len(wrong) + + return "".join(result_parts), changes + + @staticmethod + def _already_corrected( + text: str, pos: int, from_text: str, to_text: str + ) -> bool: + """ + Check if to_text already exists at the match position, meaning + the text is already in the corrected form. + + This catches the case where from_text is a substring of to_text + (e.g., "金流" is inside "现金流"). If the surrounding text already + forms "现金流", replacing "金流" would produce "现现金流". + + Returns True if the replacement should be skipped. + """ + if from_text not in to_text: + # to_text doesn't contain from_text, so no superset risk. + return False + + to_len = len(to_text) + from_len = len(from_text) + + # Find all positions where from_text appears inside to_text. + # For each, check if the surrounding text matches to_text. + offset = 0 + while True: + idx = to_text.find(from_text, offset) + if idx == -1: + break + + # If to_text were at text position (pos - idx), from_text at pos + # would be the substring starting at idx within to_text. + candidate_start = pos - idx + candidate_end = candidate_start + to_len + + if (candidate_start >= 0 + and candidate_end <= len(text) + and text[candidate_start:candidate_end] == to_text): + return True + + offset = idx + 1 + + return False + + @staticmethod + def _is_inside_longer_word(text: str, pos: int, match: str) -> bool: + """ + Check if the match at `pos` is embedded inside a longer common word. + + Looks at a window around the match and checks all possible substrings + of that window against the common words set. + """ + match_len = len(match) + # Check windows of 2 to 5 characters that overlap with the match + max_word_len = 5 + window_start = max(0, pos - (max_word_len - 1)) + window_end = min(len(text), pos + match_len + (max_word_len - 1)) + window = text[window_start:window_end] + + # Position of the match within the window + match_offset = pos - window_start + + # Check all substrings that contain the match position + for length in range(match_len + 1, min(max_word_len + 1, len(window) + 1)): + for start in range(max(0, match_offset + match_len - length), + min(match_offset + 1, len(window) - length + 1)): + substr = window[start:start + length] + if substr != match and substr in ALL_COMMON_WORDS: + return True + + return False + def get_summary(self, changes: List[Change]) -> Dict[str, int]: """Generate summary statistics""" summary = { diff --git a/transcript-fixer/scripts/fix_transcription.py b/transcript-fixer/scripts/fix_transcription.py index 6e63379..323ba17 100755 --- a/transcript-fixer/scripts/fix_transcription.py +++ b/transcript-fixer/scripts/fix_transcription.py @@ -31,6 +31,7 @@ from __future__ import annotations from cli import ( cmd_init, cmd_add_correction, + cmd_audit, cmd_list_corrections, cmd_run_correction, cmd_review_learned, @@ -89,6 +90,8 @@ def main() -> None: elif args.add_correction: args.from_text, args.to_text = args.add_correction cmd_add_correction(args) + elif getattr(args, 'audit_dictionary', False): + cmd_audit(args) elif args.list_corrections: cmd_list_corrections(args) elif args.review_learned: diff --git a/transcript-fixer/scripts/tests/test_common_words_safety.py b/transcript-fixer/scripts/tests/test_common_words_safety.py new file mode 100644 index 0000000..3d4e205 --- /dev/null +++ b/transcript-fixer/scripts/tests/test_common_words_safety.py @@ -0,0 +1,675 @@ +#!/usr/bin/env python3 +""" +Tests for common word safety checks and boundary-aware replacement. + +Covers the three classes of production bugs: +1. Common words added as corrections cause false positives +2. Substring matching causes collateral damage +3. Short common words should never be dictionary entries +""" + +import unittest +import tempfile +import shutil +from pathlib import Path +import sys + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from utils.common_words import ( + check_correction_safety, + audit_corrections, + SafetyWarning, + ALL_COMMON_WORDS, + COMMON_WORDS_2CHAR, + SUBSTRING_COLLISION_MAP, +) +from core.dictionary_processor import DictionaryProcessor +from core.correction_repository import CorrectionRepository +from core.correction_service import CorrectionService, ValidationError + + +class TestSafetyChecks(unittest.TestCase): + """Test the check_correction_safety function.""" + + def test_common_word_blocked_strict(self): + """Adding a common word like '仿佛' should produce an error in strict mode.""" + warnings = check_correction_safety("仿佛", "反复", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "Expected at least one error for '仿佛'") + self.assertTrue( + any(w.category == "common_word" for w in errors), + "Expected 'common_word' category", + ) + + def test_common_word_warning_nonstrict(self): + """In non-strict mode, common words produce warnings, not errors.""" + warnings = check_correction_safety("仿佛", "反复", strict=False) + errors = [w for w in warnings if w.level == "error"] + self.assertEqual(len(errors), 0, "Non-strict mode should have no errors") + warns = [w for w in warnings if w.level == "warning"] + self.assertTrue(len(warns) > 0, "Expected at least one warning") + + def test_both_common_words_flagged(self): + """When both from_text and to_text are common words, flag with 'both_common'.""" + warnings = check_correction_safety("正面", "正念", strict=True) + both = [w for w in warnings if w.category == "both_common"] + # "正面" is common, "正念" may or may not be -- but at least common_word should fire + common = [w for w in warnings if w.category == "common_word"] + self.assertTrue(len(common) > 0) + + def test_short_text_warning(self): + """2-char text not in common words list still gets a short_text warning.""" + # Use something unlikely to be in the common words list + warnings = check_correction_safety("zz", "xx", strict=True) + short_warns = [w for w in warnings if w.category == "short_text"] + self.assertTrue(len(short_warns) > 0, "Expected short_text warning for 2-char text") + + def test_known_substring_collision(self): + """'线数' is in SUBSTRING_COLLISION_MAP and should trigger collision warning.""" + warnings = check_correction_safety("线数", "线束", strict=True) + collisions = [w for w in warnings if w.category == "substring_collision"] + self.assertTrue(len(collisions) > 0, "Expected substring_collision for '线数'") + + def test_safe_correction_no_warnings(self): + """A safe, domain-specific correction should produce no warnings.""" + # "巨升智能" -> "具身智能" is a genuine ASR error, not a common word + warnings = check_correction_safety("巨升智能", "具身智能", strict=True) + self.assertEqual(len(warnings), 0, f"Expected no warnings, got: {warnings}") + + def test_long_from_text_safe(self): + """Long from_text (>4 chars) should not trigger short text or collision warnings.""" + warnings = check_correction_safety("语音识别错误", "语音识别模型", strict=True) + short_warns = [w for w in warnings if w.category == "short_text"] + self.assertEqual(len(short_warns), 0) + + # --- Production false positives from the bug report --- + + def test_production_false_positive_fangfu(self): + """'仿佛→反复' was a real production false positive.""" + warnings = check_correction_safety("仿佛", "反复", strict=True) + self.assertTrue(len(warnings) > 0) + + def test_production_false_positive_zhengmian(self): + """'正面→正念' was a real production false positive.""" + warnings = check_correction_safety("正面", "正念", strict=True) + self.assertTrue(len(warnings) > 0) + + def test_production_false_positive_youyu(self): + """'犹豫→抑郁' was a real production false positive.""" + warnings = check_correction_safety("犹豫", "抑郁", strict=True) + self.assertTrue(len(warnings) > 0) + + def test_production_false_positive_chuanshuo(self): + """'传说→穿梭' was a real production false positive.""" + warnings = check_correction_safety("传说", "穿梭", strict=True) + self.assertTrue(len(warnings) > 0) + + def test_production_false_positive_yanji(self): + """'演技→眼界' was a real production false positive.""" + warnings = check_correction_safety("演技", "眼界", strict=True) + self.assertTrue(len(warnings) > 0) + + def test_production_false_positive_zengjia(self): + """'增加→工站/环节' was a real production false positive.""" + warnings = check_correction_safety("增加", "工站", strict=True) + self.assertTrue(len(warnings) > 0) + + +class TestAuditCorrections(unittest.TestCase): + """Test the audit_corrections function.""" + + def test_audit_finds_known_bad_rules(self): + """Audit should flag the production false positives.""" + corrections = { + "仿佛": "反复", + "正面": "正念", + "线数": "线束", + "巨升智能": "具身智能", # This one is fine + } + issues = audit_corrections(corrections) + + self.assertIn("仿佛", issues) + self.assertIn("正面", issues) + self.assertIn("线数", issues) + self.assertNotIn("巨升智能", issues) + + def test_audit_empty_dict(self): + """Audit of empty dict returns empty.""" + issues = audit_corrections({}) + self.assertEqual(len(issues), 0) + + +class TestBoundaryAwareReplacement(unittest.TestCase): + """Test DictionaryProcessor's boundary-aware replacement logic.""" + + def test_substring_collision_prevented(self): + """'线数→线束' should NOT match inside '产线数据'.""" + processor = DictionaryProcessor({"线数": "线束"}, []) + text = "这条产线数据很重要" + result, changes = processor.process(text) + self.assertEqual(result, "这条产线数据很重要", + "Should NOT replace '线数' inside '产线数据'") + self.assertEqual(len(changes), 0) + + def test_standalone_match_replaced(self): + """'线数→线束' SHOULD match when it's standalone (not inside a longer word).""" + processor = DictionaryProcessor({"线数": "线束"}, []) + text = "检查线数是否正确" + result, changes = processor.process(text) + # "线数" here is standalone (not inside a common word), + # so it should be replaced + self.assertEqual(result, "检查线束是否正确") + self.assertEqual(len(changes), 1) + + def test_long_correction_not_affected(self): + """Corrections longer than 3 chars use standard replacement.""" + processor = DictionaryProcessor({"巨升智能": "具身智能"}, []) + text = "今天讨论巨升智能的进展" + result, changes = processor.process(text) + self.assertEqual(result, "今天讨论具身智能的进展") + self.assertEqual(len(changes), 1) + + def test_multiple_replacements_mixed(self): + """Mix of safe and unsafe positions should be handled correctly.""" + processor = DictionaryProcessor({"数据": "数据集"}, []) + text = "大数据分析和数据清洗" + result, changes = processor.process(text) + # "数据" inside "大数据" should be protected + # "数据" standalone should be replaced + # Both are common words, so boundary check applies + # The exact behavior depends on what's in ALL_COMMON_WORDS + # At minimum, the processor should not crash + self.assertIsInstance(result, str) + + def test_no_corrections_no_changes(self): + """Empty corrections dict produces no changes.""" + processor = DictionaryProcessor({}, []) + text = "原始文本" + result, changes = processor.process(text) + self.assertEqual(result, "原始文本") + self.assertEqual(len(changes), 0) + + def test_context_rules_still_work(self): + """Context rules (regex) are unaffected by boundary checks.""" + context_rules = [{ + "pattern": r"股价系统", + "replacement": "框架系统", + "description": "ASR error fix" + }] + processor = DictionaryProcessor({}, context_rules) + text = "股价系统需要优化" + result, changes = processor.process(text) + self.assertEqual(result, "框架系统需要优化") + self.assertEqual(len(changes), 1) + + +class TestSupersetReplacementBug(unittest.TestCase): + """ + Bug 1: When to_text contains from_text as a substring, and the + surrounding text already forms to_text, the replacement must be skipped. + + Production example: rule "金流"→"现金流", input "现金流断了" + Without fix: "现现金流断了" (WRONG -- duplicated prefix) + With fix: "现金流断了" (correct -- already in target form) + + This check must work for ALL rule lengths, not just short rules. + """ + + def test_suffix_superset_skip(self): + """from_text is a suffix of to_text: 金流→现金流 inside 现金流.""" + processor = DictionaryProcessor({"金流": "现金流"}, []) + result, changes = processor.process("现金流断了") + self.assertEqual(result, "现金流断了") + self.assertEqual(len(changes), 0) + + def test_suffix_superset_standalone_replaced(self): + """Standalone from_text should still be replaced.""" + processor = DictionaryProcessor({"金流": "现金流"}, []) + result, changes = processor.process("金流断了") + self.assertEqual(result, "现金流断了") + self.assertEqual(len(changes), 1) + + def test_prefix_superset_skip(self): + """from_text is a prefix of to_text: 现金→现金流 inside 现金流.""" + processor = DictionaryProcessor({"现金": "现金流"}, []) + result, changes = processor.process("现金流断了") + self.assertEqual(result, "现金流断了") + self.assertEqual(len(changes), 0) + + def test_middle_superset_skip(self): + """from_text is in the middle of to_text.""" + processor = DictionaryProcessor({"金流": "现金流通"}, []) + result, changes = processor.process("现金流通畅") + self.assertEqual(result, "现金流通畅") + self.assertEqual(len(changes), 0) + + def test_long_rule_superset_skip(self): + """Superset check must also work for long rules (>3 chars).""" + processor = DictionaryProcessor({"金流断裂": "现金流断裂"}, []) + result, changes = processor.process("现金流断裂了") + self.assertEqual(result, "现金流断裂了") + self.assertEqual(len(changes), 0) + + def test_long_rule_superset_standalone_replaced(self): + """Long rule standalone should still be replaced.""" + processor = DictionaryProcessor({"金流断裂": "现金流断裂"}, []) + result, changes = processor.process("金流断裂了") + self.assertEqual(result, "现金流断裂了") + self.assertEqual(len(changes), 1) + + def test_superset_with_unknown_words(self): + """Superset check works regardless of common_words membership.""" + # Use words NOT in ALL_COMMON_WORDS + processor = DictionaryProcessor({"资流": "投资流"}, []) + result, changes = processor.process("投资流断了") + self.assertEqual(result, "投资流断了") + self.assertEqual(len(changes), 0) + + def test_superset_mixed_positions(self): + """One occurrence is already correct, another is standalone.""" + processor = DictionaryProcessor({"金流": "现金流"}, []) + result, changes = processor.process("现金流好,金流差") + self.assertEqual(result, "现金流好,现金流差") + self.assertEqual(len(changes), 1) + + def test_no_superset_normal_replacement(self): + """When to_text does NOT contain from_text, normal replacement.""" + processor = DictionaryProcessor({"金流": "资金链"}, []) + result, changes = processor.process("金流断了") + self.assertEqual(result, "资金链断了") + self.assertEqual(len(changes), 1) + + +class TestIdiomCompoundProtection(unittest.TestCase): + """ + Bug 2: Short rules must not corrupt idioms and compound words. + + Production examples: + - "天差"→"偏差" inside "天差地别" => "偏差地别" (broken idiom) + - "亮亮"→"亮哥" inside "漂漂亮亮" => "漂漂亮哥" (broken phrase) + + Defense: _is_inside_longer_word checks common_words set. + """ + + def test_tiancha_inside_idiom(self): + """天差→偏差 must not break 天差地别.""" + processor = DictionaryProcessor({"天差": "偏差"}, []) + result, changes = processor.process("天差地别") + self.assertEqual(result, "天差地别") + self.assertEqual(len(changes), 0) + + def test_liangliang_inside_compound(self): + """亮亮→亮哥 must not break 漂漂亮亮.""" + processor = DictionaryProcessor({"亮亮": "亮哥"}, []) + result, changes = processor.process("漂漂亮亮") + self.assertEqual(result, "漂漂亮亮") + self.assertEqual(len(changes), 0) + + def test_tiancha_standalone_replaced(self): + """Standalone 天差 (not inside idiom) should be replaced.""" + processor = DictionaryProcessor({"天差": "偏差"}, []) + # 天差 alone, not followed by 地别 or other idiom continuation + result, changes = processor.process("误差天差太大了") + # Whether this gets replaced depends on common_words; at minimum + # it should not crash. If 天差 is in common words, it stays. + self.assertIsInstance(result, str) + + +class TestValidPhraseProtection(unittest.TestCase): + """ + Bug 3: Short rules must not corrupt valid phrases where from_text + is a legitimate substring. + + Production example: + - "被看"→"被砍" inside "被看见" => "被砍见" + + Defense: _is_inside_longer_word checks common_words set. + """ + + def test_beikan_inside_beikanjian(self): + """被看→被砍 must not break 被看见.""" + processor = DictionaryProcessor({"被看": "被砍"}, []) + result, changes = processor.process("被看见") + self.assertEqual(result, "被看见") + self.assertEqual(len(changes), 0) + + def test_beikan_in_sentence(self): + """被看→被砍 must not break 被看见 in a full sentence.""" + processor = DictionaryProcessor({"被看": "被砍"}, []) + result, changes = processor.process("他被看见了") + self.assertEqual(result, "他被看见了") + self.assertEqual(len(changes), 0) + + +class TestServiceSafetyIntegration(unittest.TestCase): + """Integration tests: CorrectionService rejects unsafe corrections.""" + + def setUp(self): + self.test_dir = Path(tempfile.mkdtemp()) + self.db_path = self.test_dir / "test.db" + self.repository = CorrectionRepository(self.db_path) + self.service = CorrectionService(self.repository) + + def tearDown(self): + self.service.close() + shutil.rmtree(self.test_dir) + + def test_common_word_rejected(self): + """Adding a common word correction is blocked by default.""" + with self.assertRaises(ValidationError) as ctx: + self.service.add_correction("仿佛", "反复", "general") + self.assertIn("Safety check BLOCKED", str(ctx.exception)) + + def test_common_word_forced(self): + """Adding a common word with force=True succeeds.""" + correction_id = self.service.add_correction( + "仿佛", "反复", "general", force=True, + ) + self.assertIsInstance(correction_id, int) + self.assertGreater(correction_id, 0) + + def test_safe_correction_accepted(self): + """A genuine ASR correction is accepted without force.""" + correction_id = self.service.add_correction( + "巨升智能", "具身智能", "general", + ) + self.assertIsInstance(correction_id, int) + + def test_audit_on_service(self): + """audit_dictionary method returns issues for unsafe rules.""" + # Force-add some unsafe rules + self.service.add_correction("仿佛", "反复", "general", force=True) + self.service.add_correction("巨升智能", "具身智能", "general") + + issues = self.service.audit_dictionary("general") + self.assertIn("仿佛", issues) + self.assertNotIn("巨升智能", issues) + + +class TestProductionFalsePositivesCoverage(unittest.TestCase): + """ + Verify ALL production false positives from the 2026-03 manual review + are present in the safety system and correctly caught. + + Each test corresponds to a specific word that caused real damage in production. + If any of these tests fail, it means the safety net has a gap. + """ + + # --- Category 1: Lifestyle domain --- + + def test_baojian_blocked(self): + """'保健' (lifestyle/beauty) must be caught.""" + self.assertIn("保健", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("保健", "宝剑", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'保健' must produce an error") + + def test_neihan_blocked(self): + """'内涵' (lifestyle/beauty) must be caught.""" + self.assertIn("内涵", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("内涵", "内含", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'内涵' must produce an error") + + def test_zhengjing_blocked(self): + """'正经' (lifestyle) must be caught.""" + self.assertIn("正经", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("正经", "正劲", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'正经' must produce an error") + + # --- Category 1: Manufacturing domain --- + + def test_jingong_blocked(self): + """'仅供' (manufacturing) must be caught.""" + self.assertIn("仅供", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("仅供", "紧供", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'仅供' must produce an error") + + def test_gongqi_blocked(self): + """'供气' (manufacturing) must be caught.""" + self.assertIn("供气", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("供气", "工器", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'供气' must produce an error") + + def test_chutou_blocked(self): + """'出头' (manufacturing) must be caught.""" + self.assertIn("出头", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("出头", "初投", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'出头' must produce an error") + + def test_jikou_blocked(self): + """'几口' (manufacturing) must be caught.""" + self.assertIn("几口", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("几口", "集口", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'几口' must produce an error") + + # --- Category 1: Various domains --- + + def test_liangben_blocked(self): + """'两本' must be caught.""" + self.assertIn("两本", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("两本", "量本", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'两本' must produce an error") + + def test_chuwu_blocked(self): + """'初五' must be caught.""" + self.assertIn("初五", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("初五", "出误", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'初五' must produce an error") + + def test_lijie_blocked(self): + """'力竭' must be caught.""" + self.assertIn("力竭", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("力竭", "立杰", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'力竭' must produce an error") + + def test_chongyu_blocked(self): + """'充于' must be caught.""" + self.assertIn("充于", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("充于", "冲余", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'充于' must produce an error") + + def test_shuju_blocked(self): + """'数据' must be caught.""" + self.assertIn("数据", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("数据", "束据", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'数据' must produce an error") + + # --- Category 1: Substring collision sources --- + + def test_beikan_blocked(self): + """'被看' (general) must be caught.""" + self.assertIn("被看", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("被看", "被砍", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'被看' must produce an error") + + def test_tiancha_blocked(self): + """'天差' (education) must be caught.""" + self.assertIn("天差", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("天差", "偏差", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'天差' must produce an error") + + def test_liangliang_blocked(self): + """'亮亮' (manufacturing) must be caught.""" + self.assertIn("亮亮", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("亮亮", "亮哥", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'亮亮' must produce an error") + + def test_jinliu_blocked(self): + """'金流' (manufacturing) must be caught.""" + self.assertIn("金流", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("金流", "现金流", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'金流' must produce an error") + + # --- Category 1: Substring issue sources --- + + def test_kanjian_blocked(self): + """'看见' must be caught (caused substring issues).""" + self.assertIn("看见", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("看见", "砍件", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'看见' must produce an error") + + def test_fenzhong_blocked(self): + """'分钟' must be caught (caused substring issues).""" + self.assertIn("分钟", COMMON_WORDS_2CHAR) + warnings = check_correction_safety("分钟", "份种", strict=True) + errors = [w for w in warnings if w.level == "error"] + self.assertTrue(len(errors) > 0, "'分钟' must produce an error") + + +class TestSubstringCollisionMapCoverage(unittest.TestCase): + """ + Verify all production substring collision patterns are in the map. + + Each test reproduces a real corruption pattern from production: + a short word matched inside a longer valid phrase and corrupted it. + """ + + def test_xianshu_collision_exists(self): + """'线数' inside '产线数据' -> corrupts to '产线束据'.""" + self.assertIn("线数", SUBSTRING_COLLISION_MAP) + self.assertIn("产线数据", SUBSTRING_COLLISION_MAP["线数"]) + + def test_jinliu_collision_exists(self): + """'金流' inside '现金流' -> corrupts to '现现金流'.""" + self.assertIn("金流", SUBSTRING_COLLISION_MAP) + self.assertIn("现金流", SUBSTRING_COLLISION_MAP["金流"]) + + def test_beikan_collision_exists(self): + """'被看' inside '被看见' -> corrupts to '被砍见'.""" + self.assertIn("被看", SUBSTRING_COLLISION_MAP) + self.assertIn("被看见", SUBSTRING_COLLISION_MAP["被看"]) + + def test_liangliang_collision_exists(self): + """'亮亮' inside '漂漂亮亮' -> corrupts to '漂漂亮哥'.""" + self.assertIn("亮亮", SUBSTRING_COLLISION_MAP) + self.assertIn("漂漂亮亮", SUBSTRING_COLLISION_MAP["亮亮"]) + + def test_tiancha_collision_exists(self): + """'天差' inside '天差地别' -> corrupts idiom to '偏差地别'.""" + self.assertIn("天差", SUBSTRING_COLLISION_MAP) + self.assertIn("天差地别", SUBSTRING_COLLISION_MAP["天差"]) + + def test_collision_safety_check_fires(self): + """check_correction_safety must flag entries in SUBSTRING_COLLISION_MAP.""" + for short_word in ["金流", "被看", "亮亮", "天差"]: + warnings = check_correction_safety(short_word, "dummy", strict=True) + collision_warnings = [ + w for w in warnings if w.category == "substring_collision" + ] + self.assertTrue( + len(collision_warnings) > 0, + f"'{short_word}' must trigger substring_collision warning", + ) + + +class TestBoundaryAwareProductionCollisions(unittest.TestCase): + """ + End-to-end tests: verify DictionaryProcessor does NOT corrupt + longer valid phrases when a short correction matches inside them. + + Each test reproduces an exact production corruption scenario. + """ + + def test_jinliu_inside_xianjinliu(self): + """'金流→现金流' must NOT corrupt '现金流' to '现现金流'.""" + processor = DictionaryProcessor({"金流": "现金流"}, []) + text = "公司的现金流很健康" + result, changes = processor.process(text) + self.assertEqual(result, "公司的现金流很健康", + "Must NOT replace '金流' inside '现金流'") + self.assertEqual(len(changes), 0) + + def test_beikan_inside_beikanjian(self): + """'被看→被砍' must NOT corrupt '被看见' to '被砍见'.""" + processor = DictionaryProcessor({"被看": "被砍"}, []) + text = "他被看见了" + result, changes = processor.process(text) + self.assertEqual(result, "他被看见了", + "Must NOT replace '被看' inside '被看见'") + self.assertEqual(len(changes), 0) + + def test_liangliang_inside_piaopiaoliangliag(self): + """'亮亮→亮哥' must NOT corrupt '漂漂亮亮' to '漂漂亮哥'.""" + processor = DictionaryProcessor({"亮亮": "亮哥"}, []) + text = "打扮得漂漂亮亮的" + result, changes = processor.process(text) + self.assertEqual(result, "打扮得漂漂亮亮的", + "Must NOT replace '亮亮' inside '漂漂亮亮'") + self.assertEqual(len(changes), 0) + + def test_tiancha_inside_tianchadiebie(self): + """'天差→偏差' must NOT corrupt '天差地别' to '偏差地别'.""" + processor = DictionaryProcessor({"天差": "偏差"}, []) + text = "两者天差地别" + result, changes = processor.process(text) + self.assertEqual(result, "两者天差地别", + "Must NOT replace '天差' inside '天差地别'") + self.assertEqual(len(changes), 0) + + def test_kanjian_not_corrupted_by_beikan(self): + """'被看→被砍' must NOT corrupt '看见' if '被看见' is in text.""" + processor = DictionaryProcessor({"被看": "被砍"}, []) + text = "我被看见了,别人也看见了" + result, changes = processor.process(text) + # '被看见' contains '被看' -- boundary check must protect it + self.assertNotIn("被砍", result, + "Must NOT corrupt any instance of '被看' inside '被看见'") + + +class TestAuditCatchesAllProductionFalsePositives(unittest.TestCase): + """ + Verify audit_corrections flags every single production false positive + when they appear in a corrections dictionary. + """ + + def test_audit_catches_all_category1_words(self): + """Every Category 1 word must be flagged by audit_corrections.""" + all_false_positives = { + # lifestyle + "仿佛": "反复", "正面": "正念", "犹豫": "抑郁", + "传说": "穿梭", "演技": "眼界", "无果": "无过", + "旗号": "期号", "应急": "应集", "正经": "正劲", + # lifestyle/beauty + "保健": "宝剑", "内涵": "内含", + # manufacturing + "仅供": "紧供", "供气": "工器", "出头": "初投", "几口": "集口", + # lifestyle previously disabled + "增加": "工站", "教育": "叫于", "大一": "答疑", + "曲线": "去先", "分母": "份母", + # various domains + "两本": "量本", "初五": "出误", "数据": "束据", + "力竭": "立杰", "充于": "冲余", + # substring collision sources + "被看": "被砍", "天差": "偏差", "亮亮": "亮哥", "金流": "现金流", + # substring issue words + "看见": "砍件", "分钟": "份种", + } + + issues = audit_corrections(all_false_positives) + + for word in all_false_positives: + self.assertIn( + word, issues, + f"audit_corrections MUST flag '{word}' but did not" + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/transcript-fixer/scripts/utils/common_words.py b/transcript-fixer/scripts/utils/common_words.py new file mode 100644 index 0000000..16b498c --- /dev/null +++ b/transcript-fixer/scripts/utils/common_words.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python3 +""" +Common Chinese Words Safety Check + +Detects when a correction's from_text is a common Chinese word, +which would cause false positive replacements across transcripts. + +This is the core defense against the "仿佛→反复" class of bugs: +valid corrections for one ASR model that corrupt correct text from better models. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Set + + +# High-frequency Chinese words that should NEVER be dictionary correction sources. +# These are words that appear correctly in normal Chinese text and replacing them +# would cause widespread collateral damage. +# +# Organized by category for maintainability. Not exhaustive -- the heuristic +# checks below catch additional cases. +# +# IMPORTANT: This list was curated from actual production false positives found +# in a 187-video transcription run (2026-03). Each entry caused real damage. + +COMMON_WORDS_2CHAR: Set[str] = { + # --- Production false positives (confirmed damage, 2026-03 run) --- + # lifestyle domain + "仿佛", "正面", "犹豫", "传说", "演技", "无果", "旗号", "应急", "正经", + # lifestyle/beauty domain + "保健", "内涵", + # manufacturing domain + "仅供", "供气", "出头", "几口", + # lifestyle - previously disabled then re-confirmed + "增加", "教育", "大一", "曲线", "分母", + # various domains - discovered in manual review + "两本", "初五", "数据", "力竭", "充于", + # general/manufacturing/education - substring collision sources + "被看", "天差", "亮亮", "金流", + # caused substring issues in production + "看见", "分钟", + # --- High-frequency general vocabulary --- + "我们", "他们", "你们", "这个", "那个", "什么", "怎么", "为什么", + "可以", "因为", "所以", "但是", "虽然", "如果", "已经", "正在", + "需要", "应该", "可能", "一定", "非常", "比较", "特别", "一般", + "开始", "结束", "继续", "发展", "问题", "方法", "工作", "时间", + "学习", "研究", "分析", "讨论", "了解", "知道", "觉得", "认为", + "希望", "表示", "提出", "建议", "要求", "计划", "设计", "管理", + "技术", "系统", "数据", "网络", "平台", "产品", "服务", "市场", + "企业", "公司", "团队", "项目", "客户", "用户", "资源", "成本", + "效果", "质量", "安全", "标准", "流程", "模式", "策略", "方案", + "结构", "功能", "接口", "模块", "组件", "测试", "部署", "运维", + "目标", "任务", "进度", "优化", "调整", "更新", "升级", "维护", + "配置", "参数", "设置", "选项", "状态", "信息", "内容", "格式", + "教育", "培训", "实践", "经验", "能力", "水平", "素质", "思维", + "创新", "合作", "沟通", "交流", "反馈", "评估", "考核", "激励", + # --- Common verbs and adjectives --- + "实现", "完成", "处理", "解决", "执行", "操作", "运行", "启动", + "关闭", "打开", "保存", "删除", "修改", "添加", "移除", "查看", + "搜索", "过滤", "排序", "导入", "导出", "上传", "下载", "同步", + "重要", "关键", "核心", "基本", "主要", "次要", "简单", "复杂", + "明确", "清晰", "具体", "详细", "准确", "完整", "稳定", "灵活", + # --- Domain terms that look like ASR errors but are valid --- + "线数", "曲线", "分母", "正面", "旗号", "无果", "演技", +} + +# Common 3+ character words that should also be protected. +# These serve dual purpose: +# 1. Never used as correction sources (same as 2-char words) +# 2. Used by DictionaryProcessor._is_inside_longer_word() to detect +# when a short correction target is embedded inside a valid longer word +COMMON_WORDS_3PLUS: Set[str] = { + "自动化", "智能化", "数字化", "信息化", "标准化", "规范化", + "产线数", "服务器", "数据库", "操作系统", "人工智能", "机器学习", + "深度学习", "自然语言", "计算机视觉", "强化学习", + "区块链", "云计算", "大数据", "物联网", "互联网", + # --- Production collision targets (longer words containing short false positives) --- + # These must be here so _is_inside_longer_word() can detect them + "产线数据", "现金流", "资金流", "现金流量", "资金流向", + "被看见", "被看到", "被看作", "被看成", "被看好", + "漂漂亮亮", "亮亮堂堂", "明明亮亮", + "天差地别", "天差地远", + "被看见", "没看见", + "出头露面", "出头之日", + "正月初五", "大年初五", + "保健品", "保健操", "医疗保健", + "文化内涵", + "无果而终", + # --- Common Chinese idioms/phrases containing short words --- + # These are needed to prevent idiom corruption + "正面临", "正面对", + "应急响应", "应急预案", "应急处理", + "仅供参考", "仅供参阅", +} + +# Words that commonly contain other words as substrings. +# Key: the short word, Value: common words containing it. +# Used to warn about substring collision risk. +SUBSTRING_COLLISION_MAP: dict[str, list[str]] = { + "线数": ["产线数据", "曲线数", "线数量"], + "增加": ["新增加", "增加值"], + "数据": ["大数据", "数据库", "数据集", "元数据"], + "服务": ["服务器", "服务端", "微服务", "云服务"], + "测试": ["单元测试", "集成测试", "压力测试", "测试用例"], + "模型": ["大模型", "模型训练", "预训练模型"], + "学习": ["学习率", "深度学习", "机器学习", "强化学习"], + "正面": ["正面临", "正面对"], + "应急": ["应急响应", "应急预案", "应急处理"], + "无果": ["无果而终", "毫无果断"], + # --- Production substring collision patterns (2026-03 manual review) --- + # "线数" inside "产线数据" → corrupts to "产线束据" + # (already covered above) + # "金流" inside "现金流" → corrupts to "现现金流" (replacement contains match) + "金流": ["现金流", "资金流", "资金流向", "现金流量"], + # "被看" inside "被看见" → corrupts to "被砍见" + "被看": ["被看见", "被看到", "被看作", "被看成", "被看好"], + # "亮亮" inside "漂漂亮亮" → corrupts to "漂漂亮哥" + "亮亮": ["漂漂亮亮", "亮亮堂堂", "明明亮亮"], + # "天差" inside "天差地别" → corrupts idiom to "偏差地别" + "天差": ["天差地别", "天差地远"], + # "看见" inside longer phrases → substring collision risk + "看见": ["被看见", "看见过", "没看见"], + # "分钟" inside longer phrases → substring collision risk + "分钟": ["几分钟", "十分钟", "三十分钟", "一分钟"], + # "出头" common in phrases + "出头": ["出头露面", "出头之日", "冒出头"], + # "初五" common in date phrases + "初五": ["正月初五", "大年初五"], + # "保健" common in compound words + "保健": ["保健品", "保健操", "医疗保健"], + # "内涵" common in compound words + "内涵": ["内涵段子", "文化内涵"], +} + +ALL_COMMON_WORDS: Set[str] = COMMON_WORDS_2CHAR | COMMON_WORDS_3PLUS + + +@dataclass +class SafetyWarning: + """A warning about a potentially dangerous correction rule.""" + level: str # "error" (should block) or "warning" (should confirm) + category: str # "common_word", "short_text", "substring_collision" + message: str + suggestion: str # What to do instead + + +def check_correction_safety( + from_text: str, + to_text: str, + strict: bool = True, +) -> List[SafetyWarning]: + """ + Check if a correction rule is safe to add to the dictionary. + + This is the main entry point. Returns a list of warnings/errors. + Empty list = safe to add. + + Args: + from_text: The text to be replaced (the "wrong" text) + to_text: The replacement text (the "correct" text) + strict: If True, common word matches are errors; if False, warnings + + Returns: + List of SafetyWarning objects (empty = safe) + """ + warnings: List[SafetyWarning] = [] + + # Check 1: Is from_text a known common word? + if from_text in ALL_COMMON_WORDS: + level = "error" if strict else "warning" + warnings.append(SafetyWarning( + level=level, + category="common_word", + message=( + f"'{from_text}' is a common Chinese word that appears correctly " + f"in normal text. Replacing it with '{to_text}' will cause " + f"false positives across all transcripts." + ), + suggestion=( + f"Use a context rule instead: add a regex pattern that matches " + f"'{from_text}' only in the specific context where it's an ASR error. " + f"Example: match '{from_text}' only when preceded/followed by specific characters." + ), + )) + + # Check 2: Is from_text very short (<=2 chars)? + if len(from_text) <= 2: + # Even if not in our common words list, 2-char Chinese words are risky + if from_text not in ALL_COMMON_WORDS: + # Not already flagged above -- add a length warning + warnings.append(SafetyWarning( + level="warning", + category="short_text", + message=( + f"'{from_text}' is only {len(from_text)} character(s). " + f"Short corrections have high false positive risk in Chinese " + f"because they match as substrings inside longer words." + ), + suggestion=( + f"Verify '{from_text}' is never a valid word in any context. " + f"If unsure, use a context rule with surrounding text patterns instead." + ), + )) + + # Check 3: Could from_text match as a substring inside common words? + # This catches the "线数" matching inside "产线数据" bug. + if from_text in SUBSTRING_COLLISION_MAP: + collisions = SUBSTRING_COLLISION_MAP[from_text] + warnings.append(SafetyWarning( + level="error" if strict else "warning", + category="substring_collision", + message=( + f"'{from_text}' is a substring of common words: " + f"{', '.join(collisions)}. " + f"Replacing '{from_text}' with '{to_text}' will corrupt these words." + ), + suggestion=( + f"Use a context rule with negative lookahead/lookbehind to exclude " + f"matches inside these common words. Example regex: " + f"'(? None: + """ + Check if from_text appears as a substring in any common word, + where the common word is NOT the from_text itself. + """ + if len(from_text) > 4: + # Long enough that substring collisions are unlikely to be problematic + return + + collisions: List[str] = [] + for word in ALL_COMMON_WORDS: + if word == from_text: + continue + if from_text in word: + collisions.append(word) + + if collisions: + # Only show first 5 to avoid spam + shown = collisions[:5] + more = f" (and {len(collisions) - 5} more)" if len(collisions) > 5 else "" + warnings.append(SafetyWarning( + level="warning", + category="substring_collision", + message=( + f"'{from_text}' appears inside {len(collisions)} common word(s): " + f"{', '.join(shown)}{more}. " + f"This replacement may cause collateral damage." + ), + suggestion=( + f"Review whether '{from_text}→{to_text}' could corrupt any of " + f"these words. Consider using a context rule instead." + ), + )) + + +def audit_corrections( + corrections: dict[str, str], +) -> dict[str, List[SafetyWarning]]: + """ + Audit all corrections in a dictionary for safety issues. + + Used by the --audit command. + + Args: + corrections: Dict of {from_text: to_text} + + Returns: + Dict of {from_text: [warnings]} for entries with issues. + Entries with no issues are not included. + """ + results: dict[str, List[SafetyWarning]] = {} + + for from_text, to_text in corrections.items(): + warnings = check_correction_safety(from_text, to_text, strict=False) + if warnings: + results[from_text] = warnings + + return results diff --git a/tunnel-doctor/SKILL.md b/tunnel-doctor/SKILL.md index f0e231a..4f3aee3 100644 --- a/tunnel-doctor/SKILL.md +++ b/tunnel-doctor/SKILL.md @@ -37,6 +37,8 @@ Determine which scenario applies: - **`git clone` fails with `Connection closed by 198.18.x.x`** → TUN DNS hijack for SSH (Step 2H) - **SSH connects but `operation not permitted`** → Tailscale SSH config issue (Step 4) - **SSH connects but `be-child ssh` exits code 1** → WSL snap sandbox issue (Step 5) +- **TCP port 22 reachable (`nc -z` succeeds) but SSH fails with `kex_exchange_identification: Connection closed`** → Tailscale SSH proxy intercept on WSL (Step 5A) +- **`tailscale ssh` returns "not available on App Store builds"** → Wrong Tailscale distribution on macOS (Step 5B) **Key distinctions**: - SSH does NOT use `http_proxy`/`NO_PROXY` env vars. If SSH works but HTTP doesn't → Layer 2. @@ -45,6 +47,8 @@ Determine which scenario applies: - If `ssh -T git@github.com` works but `git push` fails intermittently → Layer 4 (double tunnel). - If host `curl https://...` works but `docker pull` times out → Layer 5 (VM proxy propagation). - If DNS resolves to `198.18.x.x` virtual IPs → TUN DNS hijack (Step 2H). +- If `nc -z` succeeds on port 22 but SSH gets no banner (`kex_exchange_identification`) → Tailscale SSH proxy intercept (Step 5A). Confirm with `tcpdump -i any port 22` on the remote — 0 packets means Tailscale intercepts above the kernel. +- If `tailscale ssh` fails with "not available on App Store builds" → install Standalone Tailscale (Step 5B). ### Fast Path: Run Automated Checks @@ -96,6 +100,18 @@ export NO_PROXY=localhost,127.0.0.1,.ts.net,100.64.0.0/10,192.168.*,10.*,172.16. **NO_PROXY syntax pitfalls** — see [references/proxy_conflict_reference.md](references/proxy_conflict_reference.md) for the compatibility matrix. +**Go `net/http` CIDR caveat**: Go's standard `net/http` does NOT support CIDR notation in `NO_PROXY`. Setting `NO_PROXY=100.64.0.0/10` works for curl and Python, but Go programs (including Tailscale-adjacent tooling) will still send traffic through the proxy. The fix is to use MagicDNS hostnames (e.g., `workstation-4090-wsl`) instead of raw IPs, or add explicit hostnames to `NO_PROXY`: + +```bash +# WRONG for Go programs — CIDR is silently ignored +NO_PROXY=100.64.0.0/10 go-program http://100.101.102.103:8002/health # → goes through proxy + +# CORRECT — use hostname (matched as suffix) or explicit IP +export NO_PROXY=localhost,127.0.0.1,.ts.net,workstation-4090-wsl,100.101.102.103,192.168.*,10.*,172.16.* +``` + +This is especially relevant when accessing Tailscale services from Go-based tools (e.g., custom CLIs, Go test suites hitting remote APIs). + Verify the fix: ```bash @@ -127,6 +143,19 @@ gateway: 192.168.x.1 # Default gateway interface: en0 # Physical interface, NOT Tailscale ``` +**Important**: Not all `utun` interfaces are Tailscale's. Verify which utun belongs to Tailscale before concluding the route is correct: + +```bash +# Find Tailscale's utun interface (has a 100.x.x.x IP) +ifconfig | grep -A2 'inet 100\.' +``` + +Quick indicators by MTU: +- **MTU 1280** → typically Tailscale +- **MTU 4064** → typically Shadowrocket TUN + +If `route -n get` shows traffic going to a utun with MTU 4064, it is hitting Shadowrocket's TUN, not Tailscale — this is still a route conflict even though the interface name starts with `utun`. + Confirm with full route table: ```bash @@ -508,13 +537,89 @@ sudo tailscale up --ssh **Important**: The new installation may assign a different Tailscale IP. Check with `tailscale status --self`. +### Step 5A: Fix Tailscale SSH Proxy Silent Failure on WSL + +**Symptom**: TCP port 22 is reachable (`nc -z -w 5 22` succeeds), but SSH fails immediately with: + +``` +kex_exchange_identification: Connection closed by remote host +``` + +No SSH banner is ever received. This happens even with apt-installed Tailscale (not snap). + +**Root cause**: When `tailscale up --ssh` is enabled on WSL, Tailscale intercepts port 22 connections at the application layer (above the kernel network stack). If Tailscale's built-in SSH proxy malfunctions, it accepts the TCP connection but immediately closes it before sending the SSH banner. + +**Key diagnostic** — on the WSL instance: + +```bash +# This will show 0 packets even during active SSH attempts +sudo tcpdump -i any port 22 -c 5 -w /dev/null 2>&1 +``` + +Zero packets means Tailscale is intercepting connections before they reach the kernel network stack. The kernel's `sshd` never sees the connection. + +**Distinction from Step 5**: Step 5 covers snap sandbox issues where `be-child ssh` fails. This is a different problem — Tailscale's SSH proxy itself silently fails, regardless of installation method. + +**Fix** — disable Tailscale's SSH proxy and use regular sshd: + +```bash +# On the WSL instance: +sudo tailscale up --ssh=false + +# Verify sshd is running +sudo service ssh status +# If not running: +sudo service ssh start + +# Verify from the client machine: +ssh -o ConnectTimeout=10 @ 'echo SSH_OK' +``` + +After disabling Tailscale SSH, connections go through the kernel network stack to `sshd` as normal. The Tailscale ACL `"action": "accept"` in Step 4 is no longer relevant — authentication is handled by `sshd` using SSH keys or passwords. + +**When to keep `--ssh` enabled**: Only if you specifically need Tailscale's SSH features (ACL-based access control, no SSH key management). If standard sshd works, prefer `--ssh=false` for reliability. + +### Step 5B: Fix App Store Tailscale on macOS (Missing `tailscale ssh`) + +**Symptom**: Running `tailscale ssh` returns: + +``` +The 'tailscale ssh' subcommand is not available on macOS builds +distributed through the App Store or TestFlight. +``` + +**Root cause**: The App Store version of Tailscale for macOS is sandboxed and does not include the `tailscale ssh` subcommand. + +**Fix** — install the Standalone version: + +1. Uninstall the App Store version (delete from /Applications) +2. Download the Standalone build from https://pkgs.tailscale.com/stable/#macos +3. Install to /Applications + +**Post-install CLI setup**: The standalone `tailscale` CLI binary is embedded inside the app bundle. Add an alias to your shell config: + +```bash +# ~/.zshrc +alias tailscale="/Applications/Tailscale.app/Contents/MacOS/Tailscale" +``` + +Verify: + +```bash +source ~/.zshrc +tailscale version +tailscale ssh @ # Should work now +``` + ### Step 6: Verify End-to-End Run a complete connectivity test: ```bash -# 1. Check route is correct +# 1. Check route is correct (must show Tailscale's utun, not en0 or Shadowrocket's utun) route -n get +# Also confirm which utun is Tailscale's: +ifconfig | grep -A2 'inet 100\.' # 2. Test TCP connectivity nc -z -w 5 22 @@ -523,7 +628,7 @@ nc -z -w 5 22 ssh -o ConnectTimeout=10 -o StrictHostKeyChecking=no @ 'echo SSH_OK && hostname && whoami' ``` -All three must pass. If step 1 fails, revisit Step 3. If step 2 fails, check WSL sshd or firewall. If step 3 fails, revisit Steps 4-5. +All three must pass. If step 1 fails, revisit Step 3. If step 1 shows wrong utun (e.g., Shadowrocket's utun with MTU 4064 instead of Tailscale's with MTU 1280), that is also a route conflict. If step 2 passes but step 3 fails with `kex_exchange_identification`, revisit Step 5A (Tailscale SSH proxy intercept). If step 2 fails, check WSL sshd or firewall. If step 3 fails with other errors, revisit Steps 4-5. ## SOP: Remote Development via Tailscale @@ -612,7 +717,13 @@ Each `-L` flag is independent. If one port is already bound locally, `ExitOnForw ### 4. SSH Non-Login Shell Setup -SSH non-login shells don't load `~/.zshrc`, so nvm/Homebrew tools and proxy env vars are unavailable. Prefix all remote commands with `source ~/.zshrc 2>/dev/null;`. See [references/proxy_conflict_reference.md § SSH Non-Login Shell Pitfall](references/proxy_conflict_reference.md) for details and examples. +**This is a frequent source of "it works interactively but fails in scripts" bugs.** SSH non-login shells don't load `~/.zshrc` (or `~/.bashrc` on Linux), so tools installed via nvm, Homebrew, uv, cargo, or any shell-level manager won't be in `$PATH`. Proxy env vars set in `~/.zshrc` also won't be loaded. + +This affects **all** remote commands run via `ssh user@host "command"`, including CI/CD pipelines, cron-triggered SSH, and Makefile remote targets. Prefix all remote commands with `source ~/.zshrc 2>/dev/null;` (macOS) or `source ~/.bashrc 2>/dev/null;` (Linux/WSL). + +**Common failure**: `ssh user@host "uv run ..."` or `ssh user@host "node ..."` returns `command not found` even though the command works in an interactive SSH session. + +See [references/proxy_conflict_reference.md § SSH Non-Login Shell Pitfall](references/proxy_conflict_reference.md) for details and examples. For Makefile targets that run remote commands: diff --git a/tunnel-doctor/references/proxy_conflict_reference.md b/tunnel-doctor/references/proxy_conflict_reference.md index 03f35e3..34fed4c 100644 --- a/tunnel-doctor/references/proxy_conflict_reference.md +++ b/tunnel-doctor/references/proxy_conflict_reference.md @@ -68,6 +68,14 @@ NO_PROXY="" curl -s -X POST "http://:8080/api/ curl --noproxy '*' -s --connect-timeout 2 "http://192.168.31.110:8080/api/read" | head -1 ``` +**Port conflict warning**: Shadowrocket's config API listens on port 8080 by default, which may conflict with other services (e.g., whisper.cpp server, development proxies). If the API returns unexpected content (HTML, JSON from another service), verify what is actually listening on the port: + +```bash +lsof -nP -iTCP:8080 | head -5 +``` + +If another service owns port 8080, you need to either stop that service or access the Shadowrocket API from a different device on the same network. + **Critical**: Use `--data-binary`, NOT `-d`. The `-d` flag URL-encodes the content, corrupting `#`, `=`, `&` and other characters in the config. This **destroys the entire configuration** — all rules, settings, and proxy groups are lost. The user must restore from backup. ```bash @@ -88,6 +96,31 @@ tun-excluded-routes = 10.0.0.0/8, 127.0.0.0/8, 169.254.0.0/16, 172.16.0.0/12, 19 Note: `100.64.0.0/10` is intentionally absent. +### Complete Working Reference Config for Tailscale Compatibility + +This is a validated reference showing the correct relationship between `skip-proxy`, `tun-excluded-routes`, and `[Rule]` for Tailscale coexistence: + +``` +[General] +# skip-proxy: bypass the HTTP proxy for these destinations (fixes browser 503) +# 100.64.0.0/10 MUST be here for browser access to Tailscale IPs +skip-proxy = 192.168.0.0/16, 10.0.0.0/8, 172.16.0.0/12, 100.64.0.0/10, localhost, *.local, captive.apple.com + +# tun-excluded-routes: CIDRs excluded from TUN routing (sent directly via physical interface) +# 100.64.0.0/10 must NOT be here — including it creates an en0 route that overrides Tailscale +tun-excluded-routes = 10.0.0.0/8, 127.0.0.0/8, 169.254.0.0/16, 172.16.0.0/12, 192.0.0.0/24, 192.0.2.0/24, 192.88.99.0/24, 192.168.0.0/16, 198.51.100.0/24, 203.0.113.0/24, 224.0.0.0/4, 255.255.255.255/32 + +[Rule] +# Tailscale traffic enters TUN but is passed through without proxying +IP-CIDR,100.64.0.0/10,DIRECT +# ... other rules ... +``` + +**Key points**: +- `skip-proxy` — YES, include `100.64.0.0/10` (browser bypass) +- `tun-excluded-routes` — NO, never include `100.64.0.0/10` (would hijack routing) +- `[Rule]` — YES, include `IP-CIDR,100.64.0.0/10,DIRECT` (TUN passthrough) + ## Clash / ClashX Pro ### The Fix @@ -151,12 +184,15 @@ export NO_PROXY=localhost,127.0.0.1,.ts.net,100.64.0.0/10,192.168.*,10.*,172.16. ### NO_PROXY Syntax Pitfalls -| Syntax | curl | Python requests | Node.js | Meaning | -|--------|------|-----------------|---------|---------| -| `.ts.net` | ✅ | ✅ | ✅ | Domain suffix match (correct) | -| `*.ts.net` | ❌ | ✅ | varies | Glob — curl does NOT support this | -| `100.64.0.0/10` | ✅ 7.86+ | ✅ 2.25+ | ❌ native | CIDR notation | -| `100.*` | ✅ | ✅ | ✅ | Too broad — covers public IPs `100.0-63.*` and `100.128-255.*` | +| Syntax | curl | Python requests | Go `net/http` | Node.js | Meaning | +|--------|------|-----------------|---------------|---------|---------| +| `.ts.net` | ✅ | ✅ | ✅ | ✅ | Domain suffix match (correct) | +| `*.ts.net` | ❌ | ✅ | ❌ | varies | Glob — curl and Go do NOT support this | +| `100.64.0.0/10` | ✅ 7.86+ | ✅ 2.25+ | ❌ | ❌ native | CIDR notation — Go silently ignores it | +| `100.*` | ✅ | ✅ | ❌ | ✅ | Too broad — covers public IPs `100.0-63.*` and `100.128-255.*` | +| `workstation-name` | ✅ | ✅ | ✅ | ✅ | Exact hostname match (safest for Go) | + +**Go `net/http` warning**: Go's proxy bypass logic (`httpproxy.Config.ProxyFunc`) does not implement CIDR matching. `NO_PROXY=100.64.0.0/10` is silently ignored — Go programs will still route traffic through the proxy. Use MagicDNS hostnames (e.g., `workstation-4090-wsl`) or explicit IPs (e.g., `100.101.102.103`) instead of CIDR ranges when Go programs need to bypass the proxy. **Key rule**: Always use `.ts.net` (leading dot, no asterisk) for domain suffix matching. This is the most portable syntax across all HTTP clients. @@ -282,9 +318,9 @@ Connection setup is slightly slower (~6s vs ~2s) because TUN routing has more ne ## General Principles -### Four Conflict Layers +### Five Conflict Layers -Proxy tools create conflicts at four independent layers on macOS. Layers 1-3 affect Tailscale connectivity; Layer 4 affects SSH git operations through the same proxy infrastructure: +Proxy tools create conflicts at five independent layers on macOS. Layers 1-3 affect Tailscale connectivity; Layer 4 affects SSH git operations; Layer 5 affects VM/container runtimes: | Layer | Setting | What it controls | Symptom when wrong | |-------|---------|------------------|--------------------| @@ -292,6 +328,7 @@ Proxy tools create conflicts at four independent layers on macOS. Layers 1-3 aff | 2. HTTP env vars | `http_proxy` / `NO_PROXY` | CLI tools (curl, wget, Python, Node.js) | `curl` times out, SSH works, browser works | | 3. System proxy | `skip-proxy` | Browser and system HTTP clients | Browser 503, `curl` works (both with/without proxy), SSH works | | 4. SSH ProxyCommand | `ProxyCommand connect -H` | SSH git operations (push/pull/clone) | `ssh -T` works, `git push` fails intermittently with `failed to begin relaying via HTTP` | +| 5. VM/Container proxy | Docker/OrbStack proxy config | `docker pull`, `docker build` | Host `curl` works, `docker pull` times out (TLS handshake timeout) | **Each layer is independent.** A fix at one layer doesn't help the others. You may need fixes at multiple layers simultaneously. @@ -398,4 +435,6 @@ If a proxy config change breaks Tailscale connectivity: sudo route delete -net 100.64.0.0/10 ``` +**Important**: Manually deleting a bad `en0` route with `sudo route delete` is only a temporary fix. Shadowrocket will re-add the route when the VPN connection is next reconnected or toggled. The only permanent fix is modifying the Shadowrocket configuration to remove `100.64.0.0/10` from `tun-excluded-routes` (it should never be there). + If `tun-excluded-routes` was modified, reverting it and restarting Shadowrocket will restore Tailscale's routing immediately. diff --git a/tunnel-doctor/scripts/quick_diagnose.py b/tunnel-doctor/scripts/quick_diagnose.py index 346ca8c..a53b833 100755 --- a/tunnel-doctor/scripts/quick_diagnose.py +++ b/tunnel-doctor/scripts/quick_diagnose.py @@ -167,6 +167,32 @@ def strict_tls_check(url: str, timeout: int) -> Dict[str, object]: } +def find_tailscale_utun() -> Optional[str]: + """Find which utun interface belongs to Tailscale (has a 100.x.x.x IP).""" + code, stdout, _ = run(["ifconfig"]) + if code != 0: + return None + current_iface = "" + for line in stdout.splitlines(): + # Interface header line (e.g., "utun7: flags=...") + m = re.match(r"^(\w+):", line) + if m: + current_iface = m.group(1) + # Look for Tailscale CGNAT IP on a utun interface + if current_iface.startswith("utun") and "inet 100." in line: + return current_iface + return None + + +def get_iface_mtu(iface: str) -> Optional[int]: + """Get MTU of a network interface.""" + code, stdout, _ = run(["ifconfig", iface]) + if code != 0: + return None + m = re.search(r"mtu\s+(\d+)", stdout) + return int(m.group(1)) if m else None + + def route_check(tailscale_ip: str) -> Dict[str, object]: code, stdout, stderr = run(["route", "-n", "get", tailscale_ip]) if code != 0: @@ -181,10 +207,24 @@ def route_check(tailscale_ip: str) -> Dict[str, object]: if line.startswith("gateway:"): gateway = line.split(":", 1)[1].strip() + # Identify which utun is Tailscale's and whether the route points to it + tailscale_utun = find_tailscale_utun() + route_mtu = get_iface_mtu(interface) if interface else None + is_tailscale_iface = (interface == tailscale_utun) if tailscale_utun else None + wrong_utun = ( + interface.startswith("utun") + and tailscale_utun is not None + and interface != tailscale_utun + ) + return { "ok": True, "interface": interface, "gateway": gateway, + "tailscale_utun": tailscale_utun or "", + "route_iface_mtu": route_mtu, + "is_tailscale_iface": is_tailscale_iface, + "wrong_utun": wrong_utun, "raw": stdout, } @@ -300,6 +340,10 @@ def build_report( route_info = route_check(tailscale_ip) if tailscale_ip else None if route_info and route_info["ok"]: iface = str(route_info["interface"]) + ts_utun = str(route_info.get("tailscale_utun", "")) + route_mtu = route_info.get("route_iface_mtu") + wrong_utun = route_info.get("wrong_utun", False) + if iface.startswith("en"): findings.append( { @@ -311,6 +355,23 @@ def build_report( ), } ) + elif wrong_utun: + mtu_hint = f" (MTU {route_mtu})" if route_mtu else "" + findings.append( + { + "level": "error", + "title": "Route points to wrong utun interface", + "detail": ( + f"route -n get {tailscale_ip} resolved to {iface}{mtu_hint}, " + f"but Tailscale is on {ts_utun}. " + f"Likely hitting Shadowrocket/VPN TUN (MTU 4064) instead of Tailscale (MTU 1280)." + ), + "fix": ( + "Check proxy TUN excluded-routes and rule ordering. " + "Ensure IP-CIDR,100.64.0.0/10,DIRECT is in proxy rules." + ), + } + ) summary = { "host": host, @@ -373,8 +434,21 @@ def print_human(report: Dict[str, object]) -> int: if route: if route.get("ok"): print("Tailscale Route Check") - print(f"- interface: {route.get('interface') or 'N/A'}") - print(f"- gateway: {route.get('gateway') or 'N/A'}") + print(f"- route interface: {route.get('interface') or 'N/A'}") + route_mtu = route.get("route_iface_mtu") + if route_mtu: + print(f" route iface MTU: {route_mtu}") + print(f"- gateway: {route.get('gateway') or 'N/A'}") + ts_utun = route.get("tailscale_utun") + if ts_utun: + print(f"- tailscale utun: {ts_utun}") + is_ts = route.get("is_tailscale_iface") + if is_ts is True: + print(" route → Tailscale utun: YES (correct)") + elif is_ts is False: + print(" route → Tailscale utun: NO (MISMATCH — see findings)") + else: + print("- tailscale utun: (not detected — is Tailscale running?)") print("") else: print("Tailscale Route Check")