feat: add video tutorial scraping pipeline with per-panel OCR and AI enhancement

Add complete video tutorial extraction system that converts YouTube videos
and local video files into AI-consumable skills. The pipeline extracts
transcripts, performs visual OCR on code editor panels independently,
tracks code evolution across frames, and generates structured SKILL.md output.

Key features:
- Video metadata extraction (YouTube, local files, playlists)
- Multi-source transcript extraction (YouTube API, yt-dlp, Whisper fallback)
- Chapter-based and time-window segmentation
- Visual extraction: keyframe detection, frame classification, panel detection
- Per-panel sub-section OCR (each IDE panel OCR'd independently)
- Parallel OCR with ThreadPoolExecutor for multi-panel frames
- Narrow panel filtering (300px min width) to skip UI chrome
- Text block tracking with spatial panel position matching
- Code timeline with edit tracking across frames
- Audio-visual alignment (code + narrator pairs)
- Video-specific AI enhancement prompt for OCR denoising and code reconstruction
- video-tutorial.yaml workflow with 4 stages (OCR cleanup, language detection,
  tutorial synthesis, skill polish)
- CLI integration: skill-seekers video --url/--video-file/--playlist
- MCP tool: scrape_video for automation
- 161 tests passing

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
YusufKaraaslanSpyke
2026-02-27 23:10:19 +03:00
parent 3bad7cf365
commit 62071c4aa9
32 changed files with 15090 additions and 9 deletions

View File

@@ -401,6 +401,86 @@ WORD_ARGUMENTS: dict[str, dict[str, Any]] = {
},
}
# Video specific (from video.py)
VIDEO_ARGUMENTS: dict[str, dict[str, Any]] = {
"video_url": {
"flags": ("--video-url",),
"kwargs": {
"type": str,
"help": "Video URL (YouTube, Vimeo)",
"metavar": "URL",
},
},
"video_file": {
"flags": ("--video-file",),
"kwargs": {
"type": str,
"help": "Local video file path",
"metavar": "PATH",
},
},
"video_playlist": {
"flags": ("--video-playlist",),
"kwargs": {
"type": str,
"help": "Playlist URL",
"metavar": "URL",
},
},
"video_languages": {
"flags": ("--video-languages",),
"kwargs": {
"type": str,
"default": "en",
"help": "Transcript language preference (comma-separated)",
"metavar": "LANGS",
},
},
"visual": {
"flags": ("--visual",),
"kwargs": {
"action": "store_true",
"help": "Enable visual extraction (requires video-full deps)",
},
},
"whisper_model": {
"flags": ("--whisper-model",),
"kwargs": {
"type": str,
"default": "base",
"help": "Whisper model size (default: base)",
"metavar": "MODEL",
},
},
"visual_interval": {
"flags": ("--visual-interval",),
"kwargs": {
"type": float,
"default": 0.7,
"help": "Visual scan interval in seconds (default: 0.7)",
"metavar": "SECS",
},
},
"visual_min_gap": {
"flags": ("--visual-min-gap",),
"kwargs": {
"type": float,
"default": 0.5,
"help": "Min gap between extracted frames in seconds (default: 0.5)",
"metavar": "SECS",
},
},
"visual_similarity": {
"flags": ("--visual-similarity",),
"kwargs": {
"type": float,
"default": 3.0,
"help": "Pixel-diff threshold for duplicate detection; lower = more frames (default: 3.0)",
"metavar": "THRESH",
},
},
}
# Multi-source config specific (from unified_scraper.py)
CONFIG_ARGUMENTS: dict[str, dict[str, Any]] = {
"merge_mode": {
@@ -484,6 +564,7 @@ def get_source_specific_arguments(source_type: str) -> dict[str, dict[str, Any]]
"local": LOCAL_ARGUMENTS,
"pdf": PDF_ARGUMENTS,
"word": WORD_ARGUMENTS,
"video": VIDEO_ARGUMENTS,
"config": CONFIG_ARGUMENTS,
}
return source_args.get(source_type, {})
@@ -521,6 +602,7 @@ def add_create_arguments(parser: argparse.ArgumentParser, mode: str = "default")
- 'local': Universal + local-specific
- 'pdf': Universal + pdf-specific
- 'word': Universal + word-specific
- 'video': Universal + video-specific
- 'advanced': Advanced/rare arguments
- 'all': All 120+ arguments
@@ -561,6 +643,10 @@ def add_create_arguments(parser: argparse.ArgumentParser, mode: str = "default")
for arg_name, arg_def in WORD_ARGUMENTS.items():
parser.add_argument(*arg_def["flags"], **arg_def["kwargs"])
if mode in ["video", "all"]:
for arg_name, arg_def in VIDEO_ARGUMENTS.items():
parser.add_argument(*arg_def["flags"], **arg_def["kwargs"])
if mode in ["config", "all"]:
for arg_name, arg_def in CONFIG_ARGUMENTS.items():
parser.add_argument(*arg_def["flags"], **arg_def["kwargs"])

View File

@@ -0,0 +1,141 @@
"""Video command argument definitions.
This module defines ALL arguments for the video command in ONE place.
Both video_scraper.py (standalone) and parsers/video_parser.py (unified CLI)
import and use these definitions.
Shared arguments (name, description, output, enhance-level, api-key,
dry-run, verbose, quiet, workflow args) come from common.py / workflow.py
via ``add_all_standard_arguments()``.
"""
import argparse
from typing import Any
from .common import add_all_standard_arguments
# Video-specific argument definitions as data structure
# NOTE: Shared args (name, description, output, enhance_level, api_key, dry_run,
# verbose, quiet, workflow args) are registered by add_all_standard_arguments().
VIDEO_ARGUMENTS: dict[str, dict[str, Any]] = {
"url": {
"flags": ("--url",),
"kwargs": {
"type": str,
"help": "Video URL (YouTube, Vimeo)",
"metavar": "URL",
},
},
"video_file": {
"flags": ("--video-file",),
"kwargs": {
"type": str,
"help": "Local video file path",
"metavar": "PATH",
},
},
"playlist": {
"flags": ("--playlist",),
"kwargs": {
"type": str,
"help": "Playlist URL",
"metavar": "URL",
},
},
"languages": {
"flags": ("--languages",),
"kwargs": {
"type": str,
"default": "en",
"help": "Transcript language preference (comma-separated, default: en)",
"metavar": "LANGS",
},
},
"visual": {
"flags": ("--visual",),
"kwargs": {
"action": "store_true",
"help": "Enable visual extraction (requires video-full deps)",
},
},
"whisper_model": {
"flags": ("--whisper-model",),
"kwargs": {
"type": str,
"default": "base",
"help": "Whisper model size (default: base)",
"metavar": "MODEL",
},
},
"from_json": {
"flags": ("--from-json",),
"kwargs": {
"type": str,
"help": "Build skill from extracted JSON",
"metavar": "FILE",
},
},
"visual_interval": {
"flags": ("--visual-interval",),
"kwargs": {
"type": float,
"default": 0.7,
"help": "Visual scan interval in seconds (default: 0.7)",
"metavar": "SECS",
},
},
"visual_min_gap": {
"flags": ("--visual-min-gap",),
"kwargs": {
"type": float,
"default": 0.5,
"help": "Minimum gap between extracted frames in seconds (default: 0.5)",
"metavar": "SECS",
},
},
"visual_similarity": {
"flags": ("--visual-similarity",),
"kwargs": {
"type": float,
"default": 3.0,
"help": "Pixel-diff threshold for duplicate frame detection; lower = more frames kept (default: 3.0)",
"metavar": "THRESH",
},
},
"vision_ocr": {
"flags": ("--vision-ocr",),
"kwargs": {
"action": "store_true",
"help": "Use Claude Vision API as fallback for low-confidence code frames (requires ANTHROPIC_API_KEY, ~$0.004/frame)",
},
},
}
def add_video_arguments(parser: argparse.ArgumentParser) -> None:
"""Add all video command arguments to a parser.
Registers shared args (name, description, output, enhance-level, api-key,
dry-run, verbose, quiet, workflow args) via add_all_standard_arguments(),
then adds video-specific args on top.
The default for --enhance-level is overridden to 0 (disabled) for video.
"""
# Shared universal args first
add_all_standard_arguments(parser)
# Override enhance-level default to 0 for video
for action in parser._actions:
if hasattr(action, "dest") and action.dest == "enhance_level":
action.default = 0
action.help = (
"AI enhancement level (auto-detects API vs LOCAL mode): "
"0=disabled (default for video), 1=SKILL.md only, 2=+architecture/config, 3=full enhancement. "
"Mode selection: uses API if ANTHROPIC_API_KEY is set, otherwise LOCAL (Claude Code)"
)
# Video-specific args
for arg_name, arg_def in VIDEO_ARGUMENTS.items():
flags = arg_def["flags"]
kwargs = arg_def["kwargs"]
parser.add_argument(*flags, **kwargs)

View File

@@ -27,7 +27,7 @@ class ConfigValidator:
"""
# Valid source types
VALID_SOURCE_TYPES = {"documentation", "github", "pdf", "local"}
VALID_SOURCE_TYPES = {"documentation", "github", "pdf", "local", "word", "video"}
# Valid merge modes
VALID_MERGE_MODES = {"rule-based", "claude-enhanced"}

View File

@@ -133,6 +133,8 @@ class CreateCommand:
return self._route_pdf()
elif self.source_info.type == "word":
return self._route_word()
elif self.source_info.type == "video":
return self._route_video()
elif self.source_info.type == "config":
return self._route_config()
else:
@@ -345,6 +347,55 @@ class CreateCommand:
finally:
sys.argv = original_argv
def _route_video(self) -> int:
"""Route to video scraper (video_scraper.py)."""
from skill_seekers.cli import video_scraper
# Reconstruct argv for video_scraper
argv = ["video_scraper"]
# Add video source (URL or file)
parsed = self.source_info.parsed
if parsed.get("source_kind") == "file":
argv.extend(["--video-file", parsed["file_path"]])
elif parsed.get("url"):
url = parsed["url"]
# Detect playlist vs single video
if "playlist" in url.lower():
argv.extend(["--playlist", url])
else:
argv.extend(["--url", url])
# Add universal arguments
self._add_common_args(argv)
# Add video-specific arguments
video_langs = getattr(self.args, "video_languages", None) or getattr(self.args, "languages", None)
if video_langs:
argv.extend(["--languages", video_langs])
if getattr(self.args, "visual", False):
argv.append("--visual")
if getattr(self.args, "whisper_model", None) and self.args.whisper_model != "base":
argv.extend(["--whisper-model", self.args.whisper_model])
vi = getattr(self.args, "visual_interval", None)
if vi is not None and vi != 0.7:
argv.extend(["--visual-interval", str(vi)])
vmg = getattr(self.args, "visual_min_gap", None)
if vmg is not None and vmg != 0.5:
argv.extend(["--visual-min-gap", str(vmg)])
vs = getattr(self.args, "visual_similarity", None)
if vs is not None and vs != 3.0:
argv.extend(["--visual-similarity", str(vs)])
# Call video_scraper with modified argv
logger.debug(f"Calling video_scraper with argv: {argv}")
original_argv = sys.argv
try:
sys.argv = argv
return video_scraper.main()
finally:
sys.argv = original_argv
def _route_config(self) -> int:
"""Route to unified scraper for config files (unified_scraper.py)."""
from skill_seekers.cli import unified_scraper
@@ -468,6 +519,8 @@ Examples:
Local: skill-seekers create ./my-project -p comprehensive
PDF: skill-seekers create tutorial.pdf --ocr
DOCX: skill-seekers create document.docx
Video: skill-seekers create https://youtube.com/watch?v=...
Video: skill-seekers create recording.mp4
Config: skill-seekers create configs/react.json
Source Auto-Detection:
@@ -476,6 +529,8 @@ Source Auto-Detection:
• ./path → local codebase
• file.pdf → PDF extraction
• file.docx → Word document extraction
• youtube.com/... → Video transcript extraction
• file.mp4 → Video file extraction
• file.json → multi-source config
Progressive Help (13 → 120+ flags):
@@ -483,6 +538,7 @@ Progressive Help (13 → 120+ flags):
--help-github GitHub repository options
--help-local Local codebase analysis
--help-pdf PDF extraction options
--help-video Video extraction options
--help-advanced Rare/advanced options
--help-all All options + compatibility
@@ -513,6 +569,9 @@ Common Workflows:
parser.add_argument(
"--help-word", action="store_true", help=argparse.SUPPRESS, dest="_help_word"
)
parser.add_argument(
"--help-video", action="store_true", help=argparse.SUPPRESS, dest="_help_video"
)
parser.add_argument(
"--help-config", action="store_true", help=argparse.SUPPRESS, dest="_help_config"
)
@@ -571,6 +630,15 @@ Common Workflows:
add_create_arguments(parser_word, mode="word")
parser_word.print_help()
return 0
elif args._help_video:
parser_video = argparse.ArgumentParser(
prog="skill-seekers create",
description="Create skill from video (YouTube, Vimeo, local files)",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
add_create_arguments(parser_video, mode="video")
parser_video.print_help()
return 0
elif args._help_config:
parser_config = argparse.ArgumentParser(
prog="skill-seekers create",

View File

@@ -97,9 +97,17 @@ class SkillEnhancer:
print(f"❌ Error calling Claude API: {e}")
return None
def _is_video_source(self, references):
"""Check if the references come from video tutorial extraction."""
return any(meta["source"] == "video_tutorial" for meta in references.values())
def _build_enhancement_prompt(self, references, current_skill_md):
"""Build the prompt for Claude with multi-source awareness"""
# Dispatch to video-specific prompt if video source detected
if self._is_video_source(references):
return self._build_video_enhancement_prompt(references, current_skill_md)
# Extract skill name and description
skill_name = self.skill_dir.name
@@ -276,6 +284,148 @@ Return ONLY the complete SKILL.md content, starting with the frontmatter (---).
return prompt
def _build_video_enhancement_prompt(self, references, current_skill_md):
"""Build a video-specific enhancement prompt.
Video tutorial references contain transcript text, OCR'd code panels,
code timelines with edits, and audio-visual alignment pairs. This prompt
is tailored to reconstruct clean code from noisy OCR, detect programming
languages from context, and synthesize a coherent tutorial skill.
"""
skill_name = self.skill_dir.name
prompt = f"""You are enhancing a Claude skill built from VIDEO TUTORIAL extraction. This skill is about: {skill_name}
The raw data was extracted from video tutorials using:
1. **Transcript** (speech-to-text) — HIGH quality, this is the primary signal
2. **OCR on code panels** — NOISY, may contain line numbers, UI chrome, garbled text
3. **Code Timeline** — Tracks code evolution across frames with diffs
4. **Audio-Visual Alignment** — Pairs of on-screen code + narrator explanation
CURRENT SKILL.MD:
{"```markdown" if current_skill_md else "(none - create from scratch)"}
{current_skill_md or "No existing SKILL.md"}
{"```" if current_skill_md else ""}
REFERENCE FILES:
"""
# Add all reference content
for filename, metadata in references.items():
content = metadata["content"]
if len(content) > 30000:
content = content[:30000] + "\n\n[Content truncated for size...]"
prompt += f"\n#### {filename}\n"
prompt += f"*Source: {metadata['source']}, Confidence: {metadata['confidence']}*\n\n"
prompt += f"```markdown\n{content}\n```\n"
prompt += """
VIDEO-SPECIFIC ENHANCEMENT INSTRUCTIONS:
You are working with data extracted from programming tutorial videos. The data has
specific characteristics you MUST handle:
## 1. OCR Code Reconstruction (CRITICAL)
The OCR'd code blocks are NOISY. Common issues you MUST fix:
- **Line numbers in code**: OCR captures line numbers (1, 2, 3...) as part of the code — STRIP THEM
- **UI chrome contamination**: Tab bars, file names, button text appear in code blocks — REMOVE
- **Garbled characters**: OCR errors like `l` → `1`, `O` → `0`, `rn` → `m` — FIX using context
- **Duplicate fragments**: Same code appears across multiple frames with minor OCR variations — DEDUPLICATE
- **Incomplete lines**: Lines cut off at panel edges — RECONSTRUCT from transcript context
- **Animation/timeline numbers**: Frame counters or timeline numbers in code — REMOVE
When reconstructing code:
- The TRANSCRIPT is the ground truth for WHAT the code does
- The OCR is the ground truth for HOW the code looks (syntax, structure)
- Combine both: use transcript to understand intent, OCR for actual code structure
- If OCR is too garbled, reconstruct the code based on what the narrator describes
## 2. Language Detection
The OCR-based language detection is often WRONG. Fix it by:
- Reading the transcript for language mentions ("in GDScript", "this Python function", "our C# class")
- Using code patterns: `extends`, `func`, `var`, `signal` = GDScript; `def`, `class`, `import` = Python;
`function`, `const`, `let` = JavaScript/TypeScript; `using`, `namespace` = C#
- Looking at file extensions mentioned in the transcript or visible in tab bars
- Using proper language tags in all code fences (```gdscript, ```python, etc.)
## 3. Code Timeline Processing
The "Code Timeline" section shows how code EVOLVES during the tutorial. Use it to:
- Show the FINAL version of each code block (not intermediate states)
- Optionally show key intermediate steps if the tutorial is about building up code progressively
- The edit diffs show exactly what changed between frames — use these to understand the tutorial flow
## 4. Audio-Visual Alignment
These are the MOST VALUABLE pairs: each links on-screen code with the narrator's explanation.
- Use these to create annotated code examples with inline comments
- The narrator text explains WHY each piece of code exists
- Cross-reference these pairs to build the "how-to" sections
## 5. Tutorial Structure
Transform the raw chronological data into a LOGICAL tutorial structure:
- Group by TOPIC, not by timestamp (e.g., "Setting Up the State Machine" not "Segment 3")
- Create clear section headers that describe what is being TAUGHT
- Build a progressive learning path: concepts build on each other
- Include prerequisite knowledge mentioned by the narrator
YOUR TASK — Create an enhanced SKILL.md:
1. **Clean Overview Section**
- What does this tutorial teach? (from transcript, NOT generic)
- Prerequisites mentioned by the narrator
- Key technologies/frameworks used (from actual code, not guesses)
2. **"When to Use This Skill" Section**
- Specific trigger conditions based on what the tutorial covers
- Use cases directly from the tutorial content
- Reference the framework/library/tool being taught
3. **Quick Reference Section** (MOST IMPORTANT)
- Extract 5-10 CLEAN, reconstructed code examples
- Each example must be:
a. Denoised (no line numbers, no UI chrome, no garbled text)
b. Complete (not cut off mid-line)
c. Properly language-tagged
d. Annotated with a description from the transcript
- Prefer code from Audio-Visual Alignment pairs (they have narrator context)
- Show the FINAL working version of each code block
4. **Step-by-Step Tutorial Section**
- Follow the tutorial's teaching flow
- Each step includes: clean code + explanation from transcript
- Use narrator's explanations as the descriptions (paraphrase, don't copy verbatim)
- Show code evolution where the tutorial builds up code incrementally
5. **Key Concepts Section**
- Extract terminology and concepts the narrator explains
- Define them using the narrator's own explanations
- Link concepts to specific code examples
6. **Reference Files Description**
- Explain what each reference file contains
- Note that OCR data is raw and may contain errors
- Point to the most useful sections (Audio-Visual Alignment, Code Timeline)
7. **Keep the frontmatter** (---\\nname: ...\\n---) intact if present
CRITICAL RULES:
- NEVER include raw OCR text with line numbers or UI chrome — always clean it first
- ALWAYS use correct language tags (detect from context, not from OCR metadata)
- The transcript is your BEST source for understanding content — trust it over garbled OCR
- Extract REAL code from the references, reconstruct where needed, but never invent code
- Keep code examples SHORT and focused (5-30 lines max per example)
- Make the skill actionable: someone reading it should be able to implement what the tutorial teaches
OUTPUT:
Return ONLY the complete SKILL.md content, starting with the frontmatter (---).
"""
return prompt
def save_enhanced_skill_md(self, content):
"""Save the enhanced SKILL.md"""
# Backup original

View File

@@ -48,6 +48,7 @@ COMMAND_MODULES = {
"github": "skill_seekers.cli.github_scraper",
"pdf": "skill_seekers.cli.pdf_scraper",
"word": "skill_seekers.cli.word_scraper",
"video": "skill_seekers.cli.video_scraper",
"unified": "skill_seekers.cli.unified_scraper",
"enhance": "skill_seekers.cli.enhance_command",
"enhance-status": "skill_seekers.cli.enhance_status",
@@ -142,7 +143,6 @@ def _reconstruct_argv(command: str, args: argparse.Namespace) -> list[str]:
# Handle positional arguments (no -- prefix)
if key in [
"source", # create command
"url",
"directory",
"file",
"job_id",

View File

@@ -13,6 +13,7 @@ from .scrape_parser import ScrapeParser
from .github_parser import GitHubParser
from .pdf_parser import PDFParser
from .word_parser import WordParser
from .video_parser import VideoParser
from .unified_parser import UnifiedParser
from .enhance_parser import EnhanceParser
from .enhance_status_parser import EnhanceStatusParser
@@ -43,6 +44,7 @@ PARSERS = [
EnhanceStatusParser(),
PDFParser(),
WordParser(),
VideoParser(),
UnifiedParser(),
EstimateParser(),
InstallParser(),

View File

@@ -0,0 +1,32 @@
"""Video subcommand parser.
Uses shared argument definitions from arguments.video to ensure
consistency with the standalone video_scraper module.
"""
from .base import SubcommandParser
from skill_seekers.cli.arguments.video import add_video_arguments
class VideoParser(SubcommandParser):
"""Parser for video subcommand."""
@property
def name(self) -> str:
return "video"
@property
def help(self) -> str:
return "Extract from video (YouTube, local files)"
@property
def description(self) -> str:
return "Extract transcripts and metadata from videos and generate skill"
def add_arguments(self, parser):
"""Add video-specific arguments.
Uses shared argument definitions to ensure consistency
with video_scraper.py (standalone scraper).
"""
add_video_arguments(parser)

View File

@@ -63,24 +63,34 @@ class SourceDetector:
if source.endswith(".docx"):
return cls._detect_word(source)
# 2. Directory detection
# Video file extensions
VIDEO_EXTENSIONS = (".mp4", ".mkv", ".avi", ".mov", ".webm", ".flv", ".wmv")
if source.lower().endswith(VIDEO_EXTENSIONS):
return cls._detect_video_file(source)
# 2. Video URL detection (before directory check)
video_url_info = cls._detect_video_url(source)
if video_url_info:
return video_url_info
# 3. Directory detection
if os.path.isdir(source):
return cls._detect_local(source)
# 3. GitHub patterns
# 4. GitHub patterns
github_info = cls._detect_github(source)
if github_info:
return github_info
# 4. URL detection
# 5. URL detection
if source.startswith("http://") or source.startswith("https://"):
return cls._detect_web(source)
# 5. Domain inference (add https://)
# 6. Domain inference (add https://)
if "." in source and not source.startswith("/"):
return cls._detect_web(f"https://{source}")
# 6. Error - cannot determine
# 7. Error - cannot determine
raise ValueError(
f"Cannot determine source type for: {source}\n\n"
"Examples:\n"
@@ -89,6 +99,8 @@ class SourceDetector:
" Local: skill-seekers create ./my-project\n"
" PDF: skill-seekers create tutorial.pdf\n"
" DOCX: skill-seekers create document.docx\n"
" Video: skill-seekers create https://youtube.com/watch?v=...\n"
" Video: skill-seekers create recording.mp4\n"
" Config: skill-seekers create configs/react.json"
)
@@ -116,6 +128,55 @@ class SourceDetector:
type="word", parsed={"file_path": source}, suggested_name=name, raw_input=source
)
@classmethod
def _detect_video_file(cls, source: str) -> SourceInfo:
"""Detect local video file source."""
name = os.path.splitext(os.path.basename(source))[0]
return SourceInfo(
type="video",
parsed={"file_path": source, "source_kind": "file"},
suggested_name=name,
raw_input=source,
)
@classmethod
def _detect_video_url(cls, source: str) -> SourceInfo | None:
"""Detect video platform URL (YouTube, Vimeo).
Returns SourceInfo if the source is a video URL, None otherwise.
"""
lower = source.lower()
# YouTube patterns
youtube_keywords = ["youtube.com/watch", "youtu.be/", "youtube.com/playlist",
"youtube.com/@", "youtube.com/channel/", "youtube.com/c/",
"youtube.com/shorts/", "youtube.com/embed/"]
if any(kw in lower for kw in youtube_keywords):
# Determine suggested name
if "playlist" in lower:
name = "youtube_playlist"
elif "/@" in lower or "/channel/" in lower or "/c/" in lower:
name = "youtube_channel"
else:
name = "youtube_video"
return SourceInfo(
type="video",
parsed={"url": source, "source_kind": "url"},
suggested_name=name,
raw_input=source,
)
# Vimeo patterns
if "vimeo.com/" in lower:
return SourceInfo(
type="video",
parsed={"url": source, "source_kind": "url"},
suggested_name="vimeo_video",
raw_input=source,
)
return None
@classmethod
def _detect_local(cls, source: str) -> SourceInfo:
"""Detect local directory source."""
@@ -209,6 +270,15 @@ class SourceDetector:
if not os.path.isfile(file_path):
raise ValueError(f"Path is not a file: {file_path}")
elif source_info.type == "video":
if source_info.parsed.get("source_kind") == "file":
file_path = source_info.parsed["file_path"]
if not os.path.exists(file_path):
raise ValueError(f"Video file does not exist: {file_path}")
if not os.path.isfile(file_path):
raise ValueError(f"Path is not a file: {file_path}")
# URL-based video sources are validated during processing
elif source_info.type == "config":
config_path = source_info.parsed["config_path"]
if not os.path.exists(config_path):

View File

@@ -74,11 +74,12 @@ class UnifiedScraper:
"github": [], # List of github sources
"pdf": [], # List of pdf sources
"word": [], # List of word sources
"video": [], # List of video sources
"local": [], # List of local sources (docs or code)
}
# Track source index for unique naming (multi-source support)
self._source_counters = {"documentation": 0, "github": 0, "pdf": 0, "word": 0, "local": 0}
self._source_counters = {"documentation": 0, "github": 0, "pdf": 0, "word": 0, "video": 0, "local": 0}
# Output paths - cleaner organization
self.name = self.config["name"]
@@ -154,6 +155,8 @@ class UnifiedScraper:
self._scrape_pdf(source)
elif source_type == "word":
self._scrape_word(source)
elif source_type == "video":
self._scrape_video(source)
elif source_type == "local":
self._scrape_local(source)
else:
@@ -576,6 +579,63 @@ class UnifiedScraper:
logger.info(f"✅ Word: {len(word_data.get('pages', []))} sections extracted")
def _scrape_video(self, source: dict[str, Any]):
"""Scrape video source (YouTube, local file, etc.)."""
try:
from skill_seekers.cli.video_scraper import VideoToSkillConverter
except ImportError:
logger.error("video_scraper.py not found")
return
# Multi-source support: Get unique index for this video source
idx = self._source_counters["video"]
self._source_counters["video"] += 1
# Determine video identifier
video_url = source.get("url", "")
video_id = video_url or source.get("path", f"video_{idx}")
# Create config for video scraper
video_config = {
"name": f"{self.name}_video_{idx}",
"url": source.get("url"),
"video_file": source.get("path"),
"playlist": source.get("playlist"),
"description": source.get("description", ""),
"languages": ",".join(source.get("languages", ["en"])),
"visual": source.get("visual_extraction", False),
"whisper_model": source.get("whisper_model", "base"),
}
# Process video
logger.info(f"Scraping video: {video_id}")
converter = VideoToSkillConverter(video_config)
try:
result = converter.process()
converter.save_extracted_data()
# Append to list
self.scraped_data["video"].append(
{
"video_id": video_id,
"idx": idx,
"data": result.to_dict(),
"data_file": converter.data_file,
}
)
# Build standalone SKILL.md for synthesis
converter.build_skill()
logger.info("✅ Video: Standalone SKILL.md created")
logger.info(
f"✅ Video: {len(result.videos)} videos, "
f"{result.total_segments} segments extracted"
)
except Exception as e:
logger.error(f"Failed to process video source: {e}")
def _scrape_local(self, source: dict[str, Any]):
"""
Scrape local directory (documentation files or source code).

View File

@@ -289,6 +289,10 @@ def read_reference_files(
else:
return "codebase_analysis", "medium", repo_id
# Video tutorial sources (video_*.md from video scraper)
elif relative_path.name.startswith("video_"):
return "video_tutorial", "high", None
# Conflicts report (discrepancy detection)
elif "conflicts" in path_str:
return "conflicts", "medium", None

View File

@@ -0,0 +1,270 @@
"""Video metadata extraction module.
Uses yt-dlp for metadata extraction without downloading video content.
Supports YouTube, Vimeo, and local video files.
"""
import hashlib
import logging
import os
import re
from skill_seekers.cli.video_models import (
Chapter,
VideoInfo,
VideoSourceType,
)
logger = logging.getLogger(__name__)
# Optional dependency: yt-dlp
try:
import yt_dlp
HAS_YTDLP = True
except ImportError:
HAS_YTDLP = False
# =============================================================================
# Video ID Extraction
# =============================================================================
# YouTube URL patterns
YOUTUBE_PATTERNS = [
re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})"),
re.compile(r"(?:https?://)?youtu\.be/([a-zA-Z0-9_-]{11})"),
re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/embed/([a-zA-Z0-9_-]{11})"),
re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/v/([a-zA-Z0-9_-]{11})"),
re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/shorts/([a-zA-Z0-9_-]{11})"),
]
YOUTUBE_PLAYLIST_PATTERN = re.compile(
r"(?:https?://)?(?:www\.)?youtube\.com/playlist\?list=([a-zA-Z0-9_-]+)"
)
YOUTUBE_CHANNEL_PATTERNS = [
re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/@([a-zA-Z0-9_-]+)"),
re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/channel/([a-zA-Z0-9_-]+)"),
re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/c/([a-zA-Z0-9_-]+)"),
]
VIMEO_PATTERN = re.compile(r"(?:https?://)?(?:www\.)?vimeo\.com/(\d+)")
def extract_video_id(url: str) -> str | None:
"""Extract YouTube video ID from various URL formats.
Args:
url: YouTube URL in any supported format.
Returns:
11-character video ID, or None if not a YouTube URL.
"""
for pattern in YOUTUBE_PATTERNS:
match = pattern.search(url)
if match:
return match.group(1)
return None
def detect_video_source_type(url_or_path: str) -> VideoSourceType:
"""Detect the source type of a video URL or file path.
Args:
url_or_path: URL or local file path.
Returns:
VideoSourceType enum value.
"""
if os.path.isfile(url_or_path):
return VideoSourceType.LOCAL_FILE
if os.path.isdir(url_or_path):
return VideoSourceType.LOCAL_DIRECTORY
url_lower = url_or_path.lower()
if "youtube.com" in url_lower or "youtu.be" in url_lower:
return VideoSourceType.YOUTUBE
if "vimeo.com" in url_lower:
return VideoSourceType.VIMEO
return VideoSourceType.LOCAL_FILE
# =============================================================================
# YouTube Metadata via yt-dlp
# =============================================================================
def _check_ytdlp():
"""Raise RuntimeError if yt-dlp is not installed."""
if not HAS_YTDLP:
raise RuntimeError(
"yt-dlp is required for video metadata extraction.\n"
'Install with: pip install "skill-seekers[video]"\n'
"Or: pip install yt-dlp"
)
def extract_youtube_metadata(url: str) -> VideoInfo:
"""Extract metadata from a YouTube video URL without downloading.
Args:
url: YouTube video URL.
Returns:
VideoInfo with metadata populated.
Raises:
RuntimeError: If yt-dlp is not installed.
"""
_check_ytdlp()
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": False,
"skip_download": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
video_id = info.get("id", extract_video_id(url) or "unknown")
# Parse chapters
chapters = []
raw_chapters = info.get("chapters") or []
for i, ch in enumerate(raw_chapters):
end_time = ch.get("end_time", 0)
if i + 1 < len(raw_chapters):
end_time = raw_chapters[i + 1].get("start_time", end_time)
chapters.append(
Chapter(
title=ch.get("title", f"Chapter {i + 1}"),
start_time=ch.get("start_time", 0),
end_time=end_time,
)
)
return VideoInfo(
video_id=video_id,
source_type=VideoSourceType.YOUTUBE,
source_url=url,
title=info.get("title", ""),
description=info.get("description", ""),
duration=float(info.get("duration", 0)),
upload_date=info.get("upload_date"),
language=info.get("language") or "en",
channel_name=info.get("channel") or info.get("uploader"),
channel_url=info.get("channel_url") or info.get("uploader_url"),
view_count=info.get("view_count"),
like_count=info.get("like_count"),
comment_count=info.get("comment_count"),
tags=info.get("tags") or [],
categories=info.get("categories") or [],
thumbnail_url=info.get("thumbnail"),
chapters=chapters,
)
def extract_local_metadata(file_path: str) -> VideoInfo:
"""Extract basic metadata from a local video file.
Args:
file_path: Path to video file.
Returns:
VideoInfo with basic metadata from filename/file properties.
"""
path = os.path.abspath(file_path)
name = os.path.splitext(os.path.basename(path))[0]
video_id = hashlib.sha256(path.encode()).hexdigest()[:16]
return VideoInfo(
video_id=video_id,
source_type=VideoSourceType.LOCAL_FILE,
file_path=path,
title=name.replace("-", " ").replace("_", " ").title(),
duration=0.0, # Would need ffprobe for accurate duration
)
# =============================================================================
# Playlist / Channel Resolution
# =============================================================================
def resolve_playlist(url: str) -> list[str]:
"""Resolve a YouTube playlist URL to a list of video URLs.
Args:
url: YouTube playlist URL.
Returns:
List of video URLs in playlist order.
Raises:
RuntimeError: If yt-dlp is not installed.
"""
_check_ytdlp()
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": True,
"skip_download": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
entries = info.get("entries") or []
video_urls = []
for entry in entries:
vid_url = entry.get("url") or entry.get("webpage_url")
if vid_url:
video_urls.append(vid_url)
elif entry.get("id"):
video_urls.append(f"https://www.youtube.com/watch?v={entry['id']}")
return video_urls
def resolve_channel(url: str, max_videos: int = 50) -> list[str]:
"""Resolve a YouTube channel URL to a list of recent video URLs.
Args:
url: YouTube channel URL.
max_videos: Maximum number of videos to resolve.
Returns:
List of video URLs (most recent first).
Raises:
RuntimeError: If yt-dlp is not installed.
"""
_check_ytdlp()
ydl_opts = {
"quiet": True,
"no_warnings": True,
"extract_flat": True,
"skip_download": True,
"playlistend": max_videos,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
entries = info.get("entries") or []
video_urls = []
for entry in entries:
vid_url = entry.get("url") or entry.get("webpage_url")
if vid_url:
video_urls.append(vid_url)
elif entry.get("id"):
video_urls.append(f"https://www.youtube.com/watch?v={entry['id']}")
return video_urls[:max_videos]

View File

@@ -0,0 +1,813 @@
"""Video source data models and type definitions.
Defines all enumerations and dataclasses for the video extraction pipeline:
- Enums: VideoSourceType, TranscriptSource, FrameType, CodeContext, SegmentContentType
- Core: VideoInfo, VideoSegment, VideoScraperResult
- Supporting: Chapter, TranscriptSegment, WordTimestamp, KeyFrame, OCRRegion,
FrameSubSection, CodeBlock
- Config: VideoSourceConfig
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
# =============================================================================
# Enumerations
# =============================================================================
class VideoSourceType(Enum):
"""Where a video came from."""
YOUTUBE = "youtube"
VIMEO = "vimeo"
LOCAL_FILE = "local_file"
LOCAL_DIRECTORY = "local_directory"
class TranscriptSource(Enum):
"""How the transcript was obtained."""
YOUTUBE_MANUAL = "youtube_manual"
YOUTUBE_AUTO = "youtube_auto_generated"
WHISPER = "whisper"
SUBTITLE_FILE = "subtitle_file"
NONE = "none"
class FrameType(Enum):
"""Classification of a keyframe's visual content."""
CODE_EDITOR = "code_editor"
TERMINAL = "terminal"
SLIDE = "slide"
DIAGRAM = "diagram"
BROWSER = "browser"
WEBCAM = "webcam"
SCREENCAST = "screencast"
OTHER = "other"
class CodeContext(Enum):
"""Where code was displayed in the video."""
EDITOR = "editor"
TERMINAL = "terminal"
SLIDE = "slide"
BROWSER = "browser"
UNKNOWN = "unknown"
class SegmentContentType(Enum):
"""Primary content type of a video segment."""
EXPLANATION = "explanation"
LIVE_CODING = "live_coding"
DEMO = "demo"
SLIDES = "slides"
Q_AND_A = "q_and_a"
INTRO = "intro"
OUTRO = "outro"
MIXED = "mixed"
class SegmentationStrategy(Enum):
"""How segments are determined."""
CHAPTERS = "chapters"
TIME_WINDOW = "time_window"
SCENE_CHANGE = "scene_change"
HYBRID = "hybrid"
# =============================================================================
# Supporting Data Classes
# =============================================================================
@dataclass(frozen=True)
class Chapter:
"""A chapter marker from a video (typically YouTube)."""
title: str
start_time: float
end_time: float
@property
def duration(self) -> float:
return self.end_time - self.start_time
def to_dict(self) -> dict:
return {
"title": self.title,
"start_time": self.start_time,
"end_time": self.end_time,
}
@classmethod
def from_dict(cls, data: dict) -> Chapter:
return cls(
title=data["title"],
start_time=data["start_time"],
end_time=data["end_time"],
)
@dataclass(frozen=True)
class WordTimestamp:
"""A single word with precise timing information."""
word: str
start: float
end: float
probability: float = 1.0
def to_dict(self) -> dict:
return {
"word": self.word,
"start": self.start,
"end": self.end,
"probability": self.probability,
}
@classmethod
def from_dict(cls, data: dict) -> WordTimestamp:
return cls(
word=data["word"],
start=data["start"],
end=data["end"],
probability=data.get("probability", 1.0),
)
@dataclass(frozen=True)
class TranscriptSegment:
"""A raw transcript segment from YouTube API or Whisper."""
text: str
start: float
end: float
confidence: float = 1.0
words: list[WordTimestamp] | None = None
source: TranscriptSource = TranscriptSource.NONE
def to_dict(self) -> dict:
return {
"text": self.text,
"start": self.start,
"end": self.end,
"confidence": self.confidence,
"words": [w.to_dict() for w in self.words] if self.words else None,
"source": self.source.value,
}
@classmethod
def from_dict(cls, data: dict) -> TranscriptSegment:
words = None
if data.get("words"):
words = [WordTimestamp.from_dict(w) for w in data["words"]]
return cls(
text=data["text"],
start=data["start"],
end=data["end"],
confidence=data.get("confidence", 1.0),
words=words,
source=TranscriptSource(data.get("source", "none")),
)
@dataclass(frozen=True)
class OCRRegion:
"""A detected text region in a video frame."""
text: str
confidence: float
bbox: tuple[int, int, int, int]
is_monospace: bool = False
def to_dict(self) -> dict:
return {
"text": self.text,
"confidence": self.confidence,
"bbox": list(self.bbox),
"is_monospace": self.is_monospace,
}
@classmethod
def from_dict(cls, data: dict) -> OCRRegion:
return cls(
text=data["text"],
confidence=data["confidence"],
bbox=tuple(data["bbox"]),
is_monospace=data.get("is_monospace", False),
)
@dataclass
class FrameSubSection:
"""A single panel/region within a video frame, OCR'd independently.
Each IDE panel (e.g. code editor, terminal, file tree) is detected
as a separate sub-section so that side-by-side editors produce
independent OCR results instead of being merged into one blob.
"""
bbox: tuple[int, int, int, int] # (x1, y1, x2, y2)
frame_type: FrameType = FrameType.OTHER
ocr_text: str = ""
ocr_regions: list[OCRRegion] = field(default_factory=list)
ocr_confidence: float = 0.0
panel_id: str = "" # e.g. "panel_0_0" (row_col)
def to_dict(self) -> dict:
return {
"bbox": list(self.bbox),
"frame_type": self.frame_type.value,
"ocr_text": self.ocr_text,
"ocr_regions": [r.to_dict() for r in self.ocr_regions],
"ocr_confidence": self.ocr_confidence,
"panel_id": self.panel_id,
}
@classmethod
def from_dict(cls, data: dict) -> FrameSubSection:
return cls(
bbox=tuple(data["bbox"]),
frame_type=FrameType(data.get("frame_type", "other")),
ocr_text=data.get("ocr_text", ""),
ocr_regions=[OCRRegion.from_dict(r) for r in data.get("ocr_regions", [])],
ocr_confidence=data.get("ocr_confidence", 0.0),
panel_id=data.get("panel_id", ""),
)
@dataclass
class KeyFrame:
"""An extracted video frame with visual analysis results."""
timestamp: float
image_path: str
frame_type: FrameType = FrameType.OTHER
scene_change_score: float = 0.0
ocr_regions: list[OCRRegion] = field(default_factory=list)
ocr_text: str = ""
ocr_confidence: float = 0.0
width: int = 0
height: int = 0
sub_sections: list[FrameSubSection] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"image_path": self.image_path,
"frame_type": self.frame_type.value,
"scene_change_score": self.scene_change_score,
"ocr_regions": [r.to_dict() for r in self.ocr_regions],
"ocr_text": self.ocr_text,
"ocr_confidence": self.ocr_confidence,
"width": self.width,
"height": self.height,
"sub_sections": [ss.to_dict() for ss in self.sub_sections],
}
@classmethod
def from_dict(cls, data: dict) -> KeyFrame:
return cls(
timestamp=data["timestamp"],
image_path=data["image_path"],
frame_type=FrameType(data.get("frame_type", "other")),
scene_change_score=data.get("scene_change_score", 0.0),
ocr_regions=[OCRRegion.from_dict(r) for r in data.get("ocr_regions", [])],
ocr_text=data.get("ocr_text", ""),
ocr_confidence=data.get("ocr_confidence", 0.0),
width=data.get("width", 0),
height=data.get("height", 0),
sub_sections=[FrameSubSection.from_dict(ss) for ss in data.get("sub_sections", [])],
)
@dataclass
class CodeBlock:
"""A code block detected via OCR from video frames."""
code: str
language: str | None = None
source_frame: float = 0.0
context: CodeContext = CodeContext.UNKNOWN
confidence: float = 0.0
text_group_id: str = ""
def to_dict(self) -> dict:
return {
"code": self.code,
"language": self.language,
"source_frame": self.source_frame,
"context": self.context.value,
"confidence": self.confidence,
"text_group_id": self.text_group_id,
}
@classmethod
def from_dict(cls, data: dict) -> CodeBlock:
return cls(
code=data["code"],
language=data.get("language"),
source_frame=data.get("source_frame", 0.0),
context=CodeContext(data.get("context", "unknown")),
confidence=data.get("confidence", 0.0),
text_group_id=data.get("text_group_id", ""),
)
@dataclass
class TextGroupEdit:
"""Represents an edit detected between appearances of a text group."""
timestamp: float
added_lines: list[str] = field(default_factory=list)
removed_lines: list[str] = field(default_factory=list)
modified_lines: list[dict] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"timestamp": self.timestamp,
"added_lines": self.added_lines,
"removed_lines": self.removed_lines,
"modified_lines": self.modified_lines,
}
@classmethod
def from_dict(cls, data: dict) -> TextGroupEdit:
return cls(
timestamp=data["timestamp"],
added_lines=data.get("added_lines", []),
removed_lines=data.get("removed_lines", []),
modified_lines=data.get("modified_lines", []),
)
@dataclass
class TextGroup:
"""A group of related text blocks tracked across the video.
Represents a single code file/snippet as it appears and evolves
across multiple video frames.
"""
group_id: str
appearances: list[tuple[float, float]] = field(default_factory=list)
consensus_lines: list[dict] = field(default_factory=list)
edits: list[TextGroupEdit] = field(default_factory=list)
detected_language: str | None = None
frame_type: FrameType = FrameType.CODE_EDITOR
panel_id: str = "" # Tracks which panel this group originated from
@property
def full_text(self) -> str:
return "\n".join(line["text"] for line in self.consensus_lines if line.get("text"))
def to_dict(self) -> dict:
return {
"group_id": self.group_id,
"appearances": [[s, e] for s, e in self.appearances],
"consensus_lines": self.consensus_lines,
"edits": [e.to_dict() for e in self.edits],
"detected_language": self.detected_language,
"frame_type": self.frame_type.value,
"panel_id": self.panel_id,
"full_text": self.full_text,
}
@classmethod
def from_dict(cls, data: dict) -> TextGroup:
return cls(
group_id=data["group_id"],
appearances=[tuple(a) for a in data.get("appearances", [])],
consensus_lines=data.get("consensus_lines", []),
edits=[TextGroupEdit.from_dict(e) for e in data.get("edits", [])],
detected_language=data.get("detected_language"),
frame_type=FrameType(data.get("frame_type", "code_editor")),
panel_id=data.get("panel_id", ""),
)
@dataclass
class TextGroupTimeline:
"""Timeline of all text groups and their lifecycle in the video."""
text_groups: list[TextGroup] = field(default_factory=list)
total_code_time: float = 0.0
total_groups: int = 0
total_edits: int = 0
def get_groups_at_time(self, timestamp: float) -> list[TextGroup]:
"""Return all text groups visible at a given timestamp."""
return [
tg
for tg in self.text_groups
if any(start <= timestamp <= end for start, end in tg.appearances)
]
def to_dict(self) -> dict:
return {
"text_groups": [tg.to_dict() for tg in self.text_groups],
"total_code_time": self.total_code_time,
"total_groups": self.total_groups,
"total_edits": self.total_edits,
}
@classmethod
def from_dict(cls, data: dict) -> TextGroupTimeline:
return cls(
text_groups=[TextGroup.from_dict(tg) for tg in data.get("text_groups", [])],
total_code_time=data.get("total_code_time", 0.0),
total_groups=data.get("total_groups", 0),
total_edits=data.get("total_edits", 0),
)
@dataclass
class AudioVisualAlignment:
"""Links on-screen code with concurrent transcript narration."""
text_group_id: str
start_time: float
end_time: float
on_screen_code: str
transcript_during: str
language: str | None = None
def to_dict(self) -> dict:
return {
"text_group_id": self.text_group_id,
"start_time": self.start_time,
"end_time": self.end_time,
"on_screen_code": self.on_screen_code,
"transcript_during": self.transcript_during,
"language": self.language,
}
@classmethod
def from_dict(cls, data: dict) -> AudioVisualAlignment:
return cls(
text_group_id=data["text_group_id"],
start_time=data["start_time"],
end_time=data["end_time"],
on_screen_code=data["on_screen_code"],
transcript_during=data.get("transcript_during", ""),
language=data.get("language"),
)
# =============================================================================
# Core Data Classes
# =============================================================================
@dataclass
class VideoSegment:
"""A time-aligned segment combining transcript + visual + metadata."""
index: int
start_time: float
end_time: float
duration: float
# Stream 1: ASR (Audio)
transcript: str = ""
words: list[WordTimestamp] = field(default_factory=list)
transcript_confidence: float = 0.0
# Stream 2: OCR (Visual)
keyframes: list[KeyFrame] = field(default_factory=list)
ocr_text: str = ""
detected_code_blocks: list[CodeBlock] = field(default_factory=list)
has_code_on_screen: bool = False
has_slides: bool = False
has_diagram: bool = False
# Stream 3: Metadata
chapter_title: str | None = None
topic: str | None = None
category: str | None = None
# Merged content
content: str = ""
summary: str | None = None
# Quality metadata
confidence: float = 0.0
content_type: SegmentContentType = SegmentContentType.MIXED
def to_dict(self) -> dict:
return {
"index": self.index,
"start_time": self.start_time,
"end_time": self.end_time,
"duration": self.duration,
"transcript": self.transcript,
"words": [w.to_dict() for w in self.words],
"transcript_confidence": self.transcript_confidence,
"keyframes": [k.to_dict() for k in self.keyframes],
"ocr_text": self.ocr_text,
"detected_code_blocks": [c.to_dict() for c in self.detected_code_blocks],
"has_code_on_screen": self.has_code_on_screen,
"has_slides": self.has_slides,
"has_diagram": self.has_diagram,
"chapter_title": self.chapter_title,
"topic": self.topic,
"category": self.category,
"content": self.content,
"summary": self.summary,
"confidence": self.confidence,
"content_type": self.content_type.value,
}
@classmethod
def from_dict(cls, data: dict) -> VideoSegment:
return cls(
index=data["index"],
start_time=data["start_time"],
end_time=data["end_time"],
duration=data["duration"],
transcript=data.get("transcript", ""),
words=[WordTimestamp.from_dict(w) for w in data.get("words", [])],
transcript_confidence=data.get("transcript_confidence", 0.0),
keyframes=[KeyFrame.from_dict(k) for k in data.get("keyframes", [])],
ocr_text=data.get("ocr_text", ""),
detected_code_blocks=[
CodeBlock.from_dict(c) for c in data.get("detected_code_blocks", [])
],
has_code_on_screen=data.get("has_code_on_screen", False),
has_slides=data.get("has_slides", False),
has_diagram=data.get("has_diagram", False),
chapter_title=data.get("chapter_title"),
topic=data.get("topic"),
category=data.get("category"),
content=data.get("content", ""),
summary=data.get("summary"),
confidence=data.get("confidence", 0.0),
content_type=SegmentContentType(data.get("content_type", "mixed")),
)
@property
def timestamp_display(self) -> str:
"""Human-readable timestamp (e.g., '05:30 - 08:15')."""
start_min, start_sec = divmod(int(self.start_time), 60)
end_min, end_sec = divmod(int(self.end_time), 60)
if self.start_time >= 3600 or self.end_time >= 3600:
start_hr, start_min = divmod(start_min, 60)
end_hr, end_min = divmod(end_min, 60)
return f"{start_hr:d}:{start_min:02d}:{start_sec:02d} - {end_hr:d}:{end_min:02d}:{end_sec:02d}"
return f"{start_min:02d}:{start_sec:02d} - {end_min:02d}:{end_sec:02d}"
@dataclass
class VideoInfo:
"""Complete metadata and extracted content for a single video."""
# Identity
video_id: str
source_type: VideoSourceType
source_url: str | None = None
file_path: str | None = None
# Basic metadata
title: str = ""
description: str = ""
duration: float = 0.0
upload_date: str | None = None
language: str = "en"
# Channel / Author
channel_name: str | None = None
channel_url: str | None = None
# Engagement metadata
view_count: int | None = None
like_count: int | None = None
comment_count: int | None = None
# Discovery metadata
tags: list[str] = field(default_factory=list)
categories: list[str] = field(default_factory=list)
thumbnail_url: str | None = None
# Structure
chapters: list[Chapter] = field(default_factory=list)
# Playlist context
playlist_title: str | None = None
playlist_index: int | None = None
playlist_total: int | None = None
# Extracted content
raw_transcript: list[TranscriptSegment] = field(default_factory=list)
segments: list[VideoSegment] = field(default_factory=list)
# Processing metadata
transcript_source: TranscriptSource = TranscriptSource.NONE
visual_extraction_enabled: bool = False
whisper_model: str | None = None
processing_time_seconds: float = 0.0
extracted_at: str = ""
# Quality scores
transcript_confidence: float = 0.0
content_richness_score: float = 0.0
# Consensus-based text tracking (Phase A-D)
text_group_timeline: TextGroupTimeline | None = None
audio_visual_alignments: list[AudioVisualAlignment] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"video_id": self.video_id,
"source_type": self.source_type.value,
"source_url": self.source_url,
"file_path": self.file_path,
"title": self.title,
"description": self.description,
"duration": self.duration,
"upload_date": self.upload_date,
"language": self.language,
"channel_name": self.channel_name,
"channel_url": self.channel_url,
"view_count": self.view_count,
"like_count": self.like_count,
"comment_count": self.comment_count,
"tags": self.tags,
"categories": self.categories,
"thumbnail_url": self.thumbnail_url,
"chapters": [c.to_dict() for c in self.chapters],
"playlist_title": self.playlist_title,
"playlist_index": self.playlist_index,
"playlist_total": self.playlist_total,
"raw_transcript": [t.to_dict() for t in self.raw_transcript],
"segments": [s.to_dict() for s in self.segments],
"transcript_source": self.transcript_source.value,
"visual_extraction_enabled": self.visual_extraction_enabled,
"whisper_model": self.whisper_model,
"processing_time_seconds": self.processing_time_seconds,
"extracted_at": self.extracted_at,
"transcript_confidence": self.transcript_confidence,
"content_richness_score": self.content_richness_score,
"text_group_timeline": self.text_group_timeline.to_dict()
if self.text_group_timeline
else None,
"audio_visual_alignments": [a.to_dict() for a in self.audio_visual_alignments],
}
@classmethod
def from_dict(cls, data: dict) -> VideoInfo:
timeline_data = data.get("text_group_timeline")
timeline = TextGroupTimeline.from_dict(timeline_data) if timeline_data else None
return cls(
video_id=data["video_id"],
source_type=VideoSourceType(data["source_type"]),
source_url=data.get("source_url"),
file_path=data.get("file_path"),
title=data.get("title", ""),
description=data.get("description", ""),
duration=data.get("duration", 0.0),
upload_date=data.get("upload_date"),
language=data.get("language", "en"),
channel_name=data.get("channel_name"),
channel_url=data.get("channel_url"),
view_count=data.get("view_count"),
like_count=data.get("like_count"),
comment_count=data.get("comment_count"),
tags=data.get("tags", []),
categories=data.get("categories", []),
thumbnail_url=data.get("thumbnail_url"),
chapters=[Chapter.from_dict(c) for c in data.get("chapters", [])],
playlist_title=data.get("playlist_title"),
playlist_index=data.get("playlist_index"),
playlist_total=data.get("playlist_total"),
raw_transcript=[TranscriptSegment.from_dict(t) for t in data.get("raw_transcript", [])],
segments=[VideoSegment.from_dict(s) for s in data.get("segments", [])],
transcript_source=TranscriptSource(data.get("transcript_source", "none")),
visual_extraction_enabled=data.get("visual_extraction_enabled", False),
whisper_model=data.get("whisper_model"),
processing_time_seconds=data.get("processing_time_seconds", 0.0),
extracted_at=data.get("extracted_at", ""),
transcript_confidence=data.get("transcript_confidence", 0.0),
content_richness_score=data.get("content_richness_score", 0.0),
text_group_timeline=timeline,
audio_visual_alignments=[
AudioVisualAlignment.from_dict(a) for a in data.get("audio_visual_alignments", [])
],
)
@dataclass
class VideoSourceConfig:
"""Configuration for video source processing."""
# Source specification (exactly one should be set)
url: str | None = None
playlist: str | None = None
channel: str | None = None
path: str | None = None
directory: str | None = None
# Identity
name: str = "video"
description: str = ""
# Filtering
max_videos: int = 50
languages: list[str] | None = None
# Extraction
visual_extraction: bool = False
whisper_model: str = "base"
# Segmentation
time_window_seconds: float = 120.0
min_segment_duration: float = 10.0
max_segment_duration: float = 600.0
# Categorization
categories: dict[str, list[str]] | None = None
# Subtitle files
subtitle_patterns: list[str] | None = None
@classmethod
def from_dict(cls, data: dict) -> VideoSourceConfig:
return cls(
url=data.get("url"),
playlist=data.get("playlist"),
channel=data.get("channel"),
path=data.get("path"),
directory=data.get("directory"),
name=data.get("name", "video"),
description=data.get("description", ""),
max_videos=data.get("max_videos", 50),
languages=data.get("languages"),
visual_extraction=data.get("visual_extraction", False),
whisper_model=data.get("whisper_model", "base"),
time_window_seconds=data.get("time_window_seconds", 120.0),
min_segment_duration=data.get("min_segment_duration", 10.0),
max_segment_duration=data.get("max_segment_duration", 600.0),
categories=data.get("categories"),
subtitle_patterns=data.get("subtitle_patterns"),
)
def validate(self) -> list[str]:
"""Validate configuration. Returns list of errors."""
errors = []
sources_set = sum(
1
for s in [self.url, self.playlist, self.channel, self.path, self.directory]
if s is not None
)
if sources_set == 0:
errors.append(
"Video source must specify one of: url, playlist, channel, path, directory"
)
if sources_set > 1:
errors.append("Video source must specify exactly one source type")
return errors
@dataclass
class VideoScraperResult:
"""Complete result from the video scraper."""
videos: list[VideoInfo] = field(default_factory=list)
total_duration_seconds: float = 0.0
total_segments: int = 0
total_code_blocks: int = 0
config: VideoSourceConfig | None = None
processing_time_seconds: float = 0.0
warnings: list[str] = field(default_factory=list)
errors: list[dict[str, Any]] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"videos": [v.to_dict() for v in self.videos],
"total_duration_seconds": self.total_duration_seconds,
"total_segments": self.total_segments,
"total_code_blocks": self.total_code_blocks,
"processing_time_seconds": self.processing_time_seconds,
"warnings": self.warnings,
"errors": self.errors,
}
@classmethod
def from_dict(cls, data: dict) -> VideoScraperResult:
return cls(
videos=[VideoInfo.from_dict(v) for v in data.get("videos", [])],
total_duration_seconds=data.get("total_duration_seconds", 0.0),
total_segments=data.get("total_segments", 0),
total_code_blocks=data.get("total_code_blocks", 0),
processing_time_seconds=data.get("processing_time_seconds", 0.0),
warnings=data.get("warnings", []),
errors=data.get("errors", []),
)

View File

@@ -0,0 +1,954 @@
#!/usr/bin/env python3
"""
Video to Claude Skill Converter
Extracts transcripts, metadata, and visual content from videos
and converts them into Claude AI skills.
Supports YouTube videos/playlists, Vimeo, and local video files.
Usage:
python3 video_scraper.py --url https://www.youtube.com/watch?v=...
python3 video_scraper.py --video-file recording.mp4
python3 video_scraper.py --playlist https://www.youtube.com/playlist?list=...
python3 video_scraper.py --from-json video_extracted.json
"""
import argparse
import json
import logging
import os
import re
import sys
import time
from skill_seekers.cli.video_models import (
AudioVisualAlignment,
TextGroupTimeline,
TranscriptSource,
VideoInfo,
VideoScraperResult,
VideoSourceConfig,
VideoSourceType,
)
logger = logging.getLogger(__name__)
# =============================================================================
# Dependency Guard
# =============================================================================
# Core video deps are optional
try:
import yt_dlp # noqa: F401
HAS_YTDLP = True
except ImportError:
HAS_YTDLP = False
try:
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
HAS_YOUTUBE_TRANSCRIPT = True
except ImportError:
HAS_YOUTUBE_TRANSCRIPT = False
def check_video_dependencies(require_full: bool = False) -> None:
"""Check that required video dependencies are available.
Args:
require_full: If True, also check Tier 2 deps (Whisper, OpenCV, etc.)
Raises:
RuntimeError: If required dependencies are missing.
"""
missing = []
if not HAS_YTDLP:
missing.append("yt-dlp")
if not HAS_YOUTUBE_TRANSCRIPT:
missing.append("youtube-transcript-api")
if require_full:
try:
import cv2 # noqa: F401
except ImportError:
missing.append("opencv-python-headless")
try:
import faster_whisper # noqa: F401
except ImportError:
missing.append("faster-whisper")
if missing:
deps = ", ".join(missing)
extra = "[video-full]" if require_full else "[video]"
raise RuntimeError(
f"Missing video dependencies: {deps}\n"
f'Install with: pip install "skill-seekers{extra}"\n'
f"Or: pip install {' '.join(missing)}"
)
# =============================================================================
# Helper Functions
# =============================================================================
def _sanitize_filename(title: str, max_length: int = 60) -> str:
"""Sanitize a video title for use as a filename."""
name = title.lower()
name = re.sub(r"[^a-z0-9\s-]", "", name)
name = re.sub(r"[\s]+", "-", name)
name = re.sub(r"-+", "-", name)
name = name.strip("-")
return name[:max_length]
def _format_duration(seconds: float) -> str:
"""Format seconds as HH:MM:SS or MM:SS."""
total = int(seconds)
hours, remainder = divmod(total, 3600)
minutes, secs = divmod(remainder, 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def _format_count(count: int | None) -> str:
"""Format a count with commas."""
if count is None:
return "N/A"
return f"{count:,}"
def infer_description_from_video(video_info: VideoInfo, name: str = "") -> str:
"""Infer skill description from video metadata."""
if video_info.description:
desc = video_info.description[:150].strip()
if len(video_info.description) > 150:
desc += "..."
return f"Use when {desc.lower()}"
if video_info.title:
return f"Use when working with {video_info.title.lower()}"
return (
f"Use when referencing {name} video content"
if name
else "Use when referencing this video content"
)
# =============================================================================
# Audio-Visual Alignment
# =============================================================================
def _build_audio_visual_alignments(
timeline: TextGroupTimeline,
transcript_segments: list,
) -> list[AudioVisualAlignment]:
"""Build audio-visual alignments pairing on-screen code with transcript.
For each text group appearance, finds overlapping transcript segments
and pairs them into AudioVisualAlignment objects.
Args:
timeline: TextGroupTimeline with text groups and appearances.
transcript_segments: List of TranscriptSegment objects.
Returns:
List of AudioVisualAlignment objects.
"""
alignments: list[AudioVisualAlignment] = []
for group in timeline.text_groups:
for start, end in group.appearances:
# Find overlapping transcript segments
overlapping_text = []
for seg in transcript_segments:
seg_start = seg.start
seg_end = seg.end
# Check overlap
if seg_end > start and seg_start < end:
overlapping_text.append(seg.text)
transcript_during = " ".join(overlapping_text).strip()
if not transcript_during:
continue
alignments.append(
AudioVisualAlignment(
text_group_id=group.group_id,
start_time=start,
end_time=end,
on_screen_code=group.full_text,
transcript_during=transcript_during,
language=group.detected_language,
)
)
return alignments
# =============================================================================
# Main Converter Class
# =============================================================================
class VideoToSkillConverter:
"""Convert video content to Claude skill."""
def __init__(self, config: dict):
"""Initialize converter.
Args:
config: Configuration dict with keys:
- name: Skill name
- url/video_file/playlist: Video source
- description: Optional description
- languages: Optional language preferences
- visual: Whether to enable visual extraction
- whisper_model: Whisper model size
"""
self.config = config
self.name = config["name"]
self.description = config.get("description", "")
self.languages = (config.get("languages") or "en").split(",")
self.visual = config.get("visual", False)
self.whisper_model = config.get("whisper_model", "base")
self.visual_interval = config.get("visual_interval", 0.7)
self.visual_min_gap = config.get("visual_min_gap", 0.5)
self.visual_similarity = config.get("visual_similarity", 3.0)
self.vision_ocr = config.get("vision_ocr", False)
# Paths
self.skill_dir = config.get("output") or f"output/{self.name}"
self.data_file = f"output/{self.name}_video_extracted.json"
# Results
self.result: VideoScraperResult | None = None
def process(self) -> VideoScraperResult:
"""Run the full video processing pipeline.
Returns:
VideoScraperResult with all extracted data.
"""
from skill_seekers.cli.video_metadata import (
detect_video_source_type,
extract_local_metadata,
extract_youtube_metadata,
resolve_playlist,
)
from skill_seekers.cli.video_segmenter import segment_video
from skill_seekers.cli.video_transcript import get_transcript
start_time = time.time()
# Validate visual deps upfront so we fail fast
if self.visual:
check_video_dependencies(require_full=True)
from skill_seekers.cli.video_visual import check_visual_dependencies
deps = check_visual_dependencies()
missing = [name for name, available in deps.items() if not available]
if missing:
raise RuntimeError(
f"Visual extraction requires: {', '.join(missing)}\n"
'Install with: pip install "skill-seekers[video-full]"\n'
"Or: pip install opencv-python-headless scenedetect easyocr"
)
source_config = VideoSourceConfig(
name=self.name,
description=self.description,
languages=self.languages,
visual_extraction=self.visual,
whisper_model=self.whisper_model,
)
videos: list[VideoInfo] = []
warnings: list[str] = []
errors: list[dict] = []
# Determine source URLs
urls_or_paths = []
if self.config.get("playlist"):
logger.info("Resolving playlist...")
try:
check_video_dependencies()
urls_or_paths = resolve_playlist(self.config["playlist"])
logger.info(f"Found {len(urls_or_paths)} videos in playlist")
except Exception as e:
errors.append({"source": self.config["playlist"], "error": str(e)})
logger.error(f"Failed to resolve playlist: {e}")
elif self.config.get("url"):
urls_or_paths = [self.config["url"]]
elif self.config.get("video_file"):
urls_or_paths = [self.config["video_file"]]
# Process each video
for i, source in enumerate(urls_or_paths):
logger.info(f"[{i + 1}/{len(urls_or_paths)}] Processing: {source}")
try:
source_type = detect_video_source_type(source)
# Extract metadata
if source_type == VideoSourceType.YOUTUBE:
check_video_dependencies()
video_info = extract_youtube_metadata(source)
else:
video_info = extract_local_metadata(source)
# Extract transcript
transcript_segments, transcript_source = get_transcript(video_info, source_config)
video_info.raw_transcript = transcript_segments
video_info.transcript_source = transcript_source
if not transcript_segments:
warnings.append(f"No transcript available for '{video_info.title}'")
# Compute transcript confidence
if transcript_segments:
video_info.transcript_confidence = sum(
s.confidence for s in transcript_segments
) / len(transcript_segments)
if transcript_source == TranscriptSource.YOUTUBE_AUTO:
video_info.transcript_confidence *= 0.8
# Segment video
segments = segment_video(video_info, transcript_segments, source_config)
video_info.segments = segments
# Visual extraction (Tier 2)
if self.visual:
from skill_seekers.cli.video_visual import (
download_video,
extract_visual_data,
)
video_path = video_info.file_path
temp_video_dir = None
# Download if remote (YouTube/Vimeo)
if not video_path or not os.path.exists(video_path):
import tempfile as _tmpmod
temp_video_dir = _tmpmod.mkdtemp(prefix="ss_video_")
video_path = download_video(source, temp_video_dir)
if video_path and os.path.exists(video_path):
keyframes, code_blocks, timeline = extract_visual_data(
video_path,
segments,
self.skill_dir,
sample_interval=self.visual_interval,
min_gap=self.visual_min_gap,
similarity_threshold=self.visual_similarity,
use_vision_api=self.vision_ocr,
)
# Attach keyframes to segments
for kf in keyframes:
for seg in segments:
if seg.start_time <= kf.timestamp < seg.end_time:
seg.keyframes.append(kf)
break
# Assign code blocks to segments by timestamp
for cb in code_blocks:
for seg in segments:
if seg.start_time <= cb.source_frame < seg.end_time:
seg.detected_code_blocks.append(cb)
seg.has_code_on_screen = True
break
# Set timeline and build audio-visual alignments
video_info.text_group_timeline = timeline
if timeline:
video_info.audio_visual_alignments = _build_audio_visual_alignments(
timeline, video_info.raw_transcript
)
logger.info(
f" Visual: {len(keyframes)} keyframes extracted, "
f"{sum(1 for kf in keyframes if kf.ocr_text)} with OCR text, "
f"{len(code_blocks)} code blocks detected"
)
else:
warnings.append(f"Could not download video for visual extraction: {source}")
# Clean up temp download
if temp_video_dir:
import shutil
shutil.rmtree(temp_video_dir, ignore_errors=True)
# Set processing metadata
video_info.extracted_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
video_info.visual_extraction_enabled = self.visual
video_info.processing_time_seconds = time.time() - start_time
videos.append(video_info)
visual_msg = ""
if self.visual:
total_kf = sum(len(s.keyframes) for s in segments)
total_ocr = sum(1 for s in segments for kf in s.keyframes if kf.ocr_text)
visual_msg = f", {total_kf} keyframes, {total_ocr} with OCR"
logger.info(
f" => {len(segments)} segments, "
f"{len(transcript_segments)} transcript chunks, "
f"source: {transcript_source.value}{visual_msg}"
)
except Exception as e:
errors.append({"source": source, "error": str(e)})
logger.error(f"Failed to process {source}: {e}")
logger.debug("Traceback:", exc_info=True)
# Build result
total_duration = sum(v.duration for v in videos)
total_segments = sum(len(v.segments) for v in videos)
total_code_blocks = sum(
sum(len(s.detected_code_blocks) for s in v.segments) for v in videos
)
self.result = VideoScraperResult(
videos=videos,
total_duration_seconds=total_duration,
total_segments=total_segments,
total_code_blocks=total_code_blocks,
config=source_config,
processing_time_seconds=time.time() - start_time,
warnings=warnings,
errors=errors,
)
return self.result
def save_extracted_data(self) -> str:
"""Save extracted data to JSON file.
Returns:
Path to saved JSON file.
"""
if self.result is None:
raise RuntimeError("No data to save. Run process() first.")
os.makedirs(os.path.dirname(self.data_file) or ".", exist_ok=True)
with open(self.data_file, "w", encoding="utf-8") as f:
json.dump(self.result.to_dict(), f, indent=2, ensure_ascii=False)
logger.info(f"Saved extracted data to {self.data_file}")
return self.data_file
def load_extracted_data(self, json_path: str) -> None:
"""Load previously extracted data from JSON.
Args:
json_path: Path to extracted JSON file.
"""
with open(json_path, encoding="utf-8") as f:
data = json.load(f)
self.result = VideoScraperResult.from_dict(data)
logger.info(f"Loaded {len(self.result.videos)} videos from {json_path}")
def build_skill(self) -> str:
"""Build skill directory with SKILL.md and reference files.
Returns:
Path to skill directory.
"""
if self.result is None:
raise RuntimeError(
"No data to build from. Run process() or load_extracted_data() first."
)
# Create directories
refs_dir = os.path.join(self.skill_dir, "references")
video_data_dir = os.path.join(self.skill_dir, "video_data")
os.makedirs(refs_dir, exist_ok=True)
os.makedirs(video_data_dir, exist_ok=True)
# Generate reference files for each video
for video in self.result.videos:
ref_filename = f"video_{_sanitize_filename(video.title)}.md"
ref_path = os.path.join(refs_dir, ref_filename)
ref_content = self._generate_reference_md(video)
with open(ref_path, "w", encoding="utf-8") as f:
f.write(ref_content)
# Save metadata JSON
metadata_path = os.path.join(video_data_dir, "metadata.json")
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(self.result.to_dict(), f, indent=2, ensure_ascii=False)
# Generate SKILL.md
skill_md = self._generate_skill_md()
skill_path = os.path.join(self.skill_dir, "SKILL.md")
with open(skill_path, "w", encoding="utf-8") as f:
f.write(skill_md)
logger.info(f"Built skill at {self.skill_dir}")
logger.info(f" {len(self.result.videos)} videos, {self.result.total_segments} segments")
return self.skill_dir
def _generate_reference_md(self, video: VideoInfo) -> str:
"""Generate reference markdown file for a single video."""
lines = []
# Title
lines.append(f"# {video.title}\n")
# Metadata block
meta_parts = []
if video.channel_name:
if video.channel_url:
meta_parts.append(f"**Source:** [{video.channel_name}]({video.channel_url})")
else:
meta_parts.append(f"**Source:** {video.channel_name}")
if video.duration > 0:
meta_parts.append(f"**Duration:** {_format_duration(video.duration)}")
if video.upload_date:
meta_parts.append(f"**Published:** {video.upload_date}")
if meta_parts:
lines.append("> " + " | ".join(meta_parts))
if video.source_url:
lines.append(f"> **URL:** [{video.source_url}]({video.source_url})")
engagement_parts = []
if video.view_count is not None:
engagement_parts.append(f"**Views:** {_format_count(video.view_count)}")
if video.like_count is not None:
engagement_parts.append(f"**Likes:** {_format_count(video.like_count)}")
if engagement_parts:
lines.append("> " + " | ".join(engagement_parts))
if video.tags:
lines.append(f"> **Tags:** {', '.join(video.tags[:10])}")
lines.append("")
# Description summary
if video.description:
desc = video.description[:300]
if len(video.description) > 300:
desc += "..."
lines.append(desc)
lines.append("")
lines.append("---\n")
# Table of contents (from chapters or segments)
if video.segments:
lines.append("## Table of Contents\n")
for seg in video.segments:
label = seg.chapter_title or f"Segment {seg.index + 1}"
lines.append(
f"- [{label}](#{_sanitize_filename(label)}-{seg.timestamp_display.replace(' ', '')})"
)
lines.append("\n---\n")
# Segments as sections
for seg in video.segments:
lines.append(seg.content)
# Visual data (keyframes + OCR)
if seg.keyframes:
for kf in seg.keyframes:
if kf.image_path and os.path.exists(kf.image_path):
rel_path = os.path.relpath(
kf.image_path,
os.path.dirname(os.path.join(self.skill_dir, "references", "x.md")),
)
lines.append(
f"\n> **Frame** ({kf.frame_type.value} at {_format_duration(kf.timestamp)}):"
)
lines.append(f"> ![keyframe]({rel_path})")
if kf.sub_sections:
from skill_seekers.cli.video_models import FrameType
lang_hint = ""
if seg.detected_code_blocks:
for cb in seg.detected_code_blocks:
if cb.language:
lang_hint = cb.language
break
for ss in kf.sub_sections:
if (
ss.frame_type in (FrameType.CODE_EDITOR, FrameType.TERMINAL)
and ss.ocr_text
):
lines.append(f"\n```{lang_hint}")
lines.append(ss.ocr_text)
lines.append("```")
elif kf.ocr_text:
from skill_seekers.cli.video_models import FrameType
if kf.frame_type in (FrameType.CODE_EDITOR, FrameType.TERMINAL):
lang_hint = ""
if seg.detected_code_blocks:
for cb in seg.detected_code_blocks:
if cb.language:
lang_hint = cb.language
break
lines.append(f"\n```{lang_hint}")
lines.append(kf.ocr_text)
lines.append("```")
elif kf.frame_type == FrameType.SLIDE:
for text_line in kf.ocr_text.split("\n"):
if text_line.strip():
lines.append(f"> {text_line}")
else:
lines.append(f"> **On-screen text:** {kf.ocr_text}")
# Detected code blocks subsection
if seg.detected_code_blocks:
lines.append("\n#### Detected Code\n")
for cb in seg.detected_code_blocks:
lang_label = cb.language or "unknown"
context_label = cb.context.value if cb.context else "unknown"
lines.append(
f"**{lang_label}** ({context_label} at "
f"{_format_duration(cb.source_frame)}):\n"
)
lines.append(f"```{cb.language or ''}")
lines.append(cb.code)
lines.append("```\n")
lines.append("\n---\n")
# Code Timeline section (from text groups)
if video.text_group_timeline and video.text_group_timeline.text_groups:
tl = video.text_group_timeline
lines.append("\n## Code Timeline\n")
lines.append(
f"> {tl.total_groups} code groups tracked, "
f"{tl.total_edits} edits detected, "
f"{tl.total_code_time:.0f}s of on-screen code\n"
)
for group in tl.text_groups:
lang_hint = group.detected_language or ""
lines.append(f"### {group.group_id}")
appearance_strs = []
for start, end in group.appearances:
appearance_strs.append(f"{_format_duration(start)} - {_format_duration(end)}")
lines.append(f"**Appearances:** {', '.join(appearance_strs)}\n")
lines.append(f"```{lang_hint}")
lines.append(group.full_text)
lines.append("```\n")
if group.edits:
lines.append("**Edits:**\n")
for edit in group.edits:
lines.append(f"- At {_format_duration(edit.timestamp)}:")
for line in edit.added_lines:
lines.append(f" + `{line}`")
for line in edit.removed_lines:
lines.append(f" - `{line}`")
for mod in edit.modified_lines:
lines.append(
f" ~ L{mod.get('line_num', '?')}: "
f"`{mod.get('old', '')}` → `{mod.get('new', '')}`"
)
lines.append("")
lines.append("---\n")
# Audio-Visual Alignment section
if video.audio_visual_alignments:
lines.append("\n## Audio-Visual Alignment\n")
lines.append(f"> {len(video.audio_visual_alignments)} code-narration pairs\n")
for av in video.audio_visual_alignments:
lang_hint = av.language or ""
lines.append(
f"**{av.text_group_id}** "
f"({_format_duration(av.start_time)} - {_format_duration(av.end_time)})\n"
)
lines.append(f"```{lang_hint}")
lines.append(av.on_screen_code)
lines.append("```\n")
lines.append(f"> **Narrator:** {av.transcript_during}\n")
lines.append("---\n")
# Transcript source info
lines.append(f"\n*Transcript source: {video.transcript_source.value}*")
if video.transcript_confidence > 0:
lines.append(f"*Confidence: {video.transcript_confidence:.0%}*")
return "\n".join(lines)
def _generate_skill_md(self) -> str:
"""Generate the main SKILL.md file."""
lines = []
desc = self.description or infer_description_from_video(
self.result.videos[0]
if self.result.videos
else VideoInfo(video_id="none", source_type=VideoSourceType.YOUTUBE),
self.name,
)
lines.append(f"# {self.name}\n")
lines.append(f"{desc}\n")
# Overview
total_dur = _format_duration(self.result.total_duration_seconds)
lines.append("## Overview\n")
overview = (
f"This skill includes knowledge extracted from "
f"{len(self.result.videos)} video(s) totaling {total_dur} of content."
)
# Visual extraction summary
total_kf = sum(
len(kf) for v in self.result.videos for s in v.segments for kf in [s.keyframes]
)
total_ocr = sum(
1 for v in self.result.videos for s in v.segments for kf in s.keyframes if kf.ocr_text
)
total_code = sum(
len(s.detected_code_blocks) for v in self.result.videos for s in v.segments
)
if total_kf > 0:
overview += (
f"\nVisual extraction: {total_kf} keyframes, {total_ocr} with on-screen text"
)
if total_code > 0:
overview += f", {total_code} code blocks detected"
overview += "."
lines.append(f"{overview}\n")
# Video tutorials section
lines.append("## Video Tutorials\n")
for video in self.result.videos:
lines.append(f"### {video.title}")
meta = []
if video.channel_name:
if video.source_url:
meta.append(f"[{video.channel_name}]({video.source_url})")
else:
meta.append(video.channel_name)
if video.duration > 0:
meta.append(_format_duration(video.duration))
if video.view_count is not None:
meta.append(f"{_format_count(video.view_count)} views")
if meta:
lines.append(f"**Source:** {' | '.join(meta)}\n")
# Topics covered
topics = [s.chapter_title for s in video.segments if s.chapter_title]
if topics:
lines.append(f"**Topics covered:** {', '.join(topics)}\n")
# First segment preview
if video.segments and video.segments[0].transcript:
preview = video.segments[0].transcript[:200]
if len(video.segments[0].transcript) > 200:
preview += "..."
lines.append(f"{preview}\n")
ref_filename = f"video_{_sanitize_filename(video.title)}.md"
lines.append(
f"> Full transcript: [references/{ref_filename}](references/{ref_filename})\n"
)
lines.append("---\n")
# Warnings
if self.result.warnings:
lines.append("## Notes\n")
for warning in self.result.warnings:
lines.append(f"- {warning}")
lines.append("")
# References
lines.append("## References\n")
for video in self.result.videos:
ref_filename = f"video_{_sanitize_filename(video.title)}.md"
lines.append(f"- [{video.title}](references/{ref_filename})")
return "\n".join(lines)
# =============================================================================
# CLI Entry Point
# =============================================================================
def main() -> int:
"""Entry point for video scraper CLI.
Returns:
Exit code (0 for success, non-zero for error).
"""
from skill_seekers.cli.arguments.video import add_video_arguments
parser = argparse.ArgumentParser(
prog="skill-seekers-video",
description="Extract transcripts and metadata from videos and generate skill",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
Examples:
skill-seekers video --url https://www.youtube.com/watch?v=...
skill-seekers video --video-file recording.mp4
skill-seekers video --playlist https://www.youtube.com/playlist?list=...
skill-seekers video --from-json video_extracted.json
skill-seekers video --url https://youtu.be/... --languages en,es
""",
)
add_video_arguments(parser)
args = parser.parse_args()
# Setup logging
log_level = logging.DEBUG if args.verbose else (logging.WARNING if args.quiet else logging.INFO)
logging.basicConfig(level=log_level, format="%(levelname)s: %(message)s")
# Validate inputs
has_source = any(
[
getattr(args, "url", None),
getattr(args, "video_file", None),
getattr(args, "playlist", None),
]
)
has_json = getattr(args, "from_json", None)
if not has_source and not has_json:
parser.error("Must specify --url, --video-file, --playlist, or --from-json")
# Build config
config = {
"name": args.name or "video_skill",
"description": getattr(args, "description", None) or "",
"output": getattr(args, "output", None),
"url": getattr(args, "url", None),
"video_file": getattr(args, "video_file", None),
"playlist": getattr(args, "playlist", None),
"languages": getattr(args, "languages", "en"),
"visual": getattr(args, "visual", False),
"whisper_model": getattr(args, "whisper_model", "base"),
"visual_interval": getattr(args, "visual_interval", 0.7),
"visual_min_gap": getattr(args, "visual_min_gap", 0.5),
"visual_similarity": getattr(args, "visual_similarity", 3.0),
"vision_ocr": getattr(args, "vision_ocr", False),
}
converter = VideoToSkillConverter(config)
# Dry run
if args.dry_run:
logger.info("DRY RUN — would process:")
for key in ["url", "video_file", "playlist"]:
if config.get(key):
logger.info(f" {key}: {config[key]}")
logger.info(f" name: {config['name']}")
logger.info(f" languages: {config['languages']}")
logger.info(f" visual: {config['visual']}")
return 0
# Workflow 1: Build from JSON
if has_json:
logger.info(f"Loading extracted data from {args.from_json}")
converter.load_extracted_data(args.from_json)
converter.build_skill()
logger.info(f"Skill built at {converter.skill_dir}")
return 0
# Workflow 2: Full extraction + build
try:
result = converter.process()
if not result.videos:
logger.error("No videos were successfully processed")
if result.errors:
for err in result.errors:
logger.error(f" {err['source']}: {err['error']}")
return 1
converter.save_extracted_data()
converter.build_skill()
logger.info(f"\nSkill built successfully at {converter.skill_dir}")
logger.info(f" Videos: {len(result.videos)}")
logger.info(f" Segments: {result.total_segments}")
logger.info(f" Duration: {_format_duration(result.total_duration_seconds)}")
logger.info(f" Processing time: {result.processing_time_seconds:.1f}s")
if result.warnings:
for w in result.warnings:
logger.warning(f" {w}")
except RuntimeError as e:
logger.error(str(e))
return 1
# Enhancement
enhance_level = getattr(args, "enhance_level", 0)
if enhance_level > 0:
# Auto-inject video-tutorial workflow if no workflow specified
if not getattr(args, "enhance_workflow", None):
args.enhance_workflow = ["video-tutorial"]
# Run workflow stages (specialized video analysis)
try:
from skill_seekers.cli.workflow_runner import run_workflows
video_context = {
"skill_name": converter.name,
"skill_dir": converter.skill_dir,
"source_type": "video_tutorial",
}
run_workflows(args, context=video_context)
except ImportError:
logger.debug("Workflow runner not available, skipping workflow stages")
# Run traditional SKILL.md enhancement (reads references + rewrites)
_run_video_enhancement(converter.skill_dir, enhance_level, args)
return 0
def _run_video_enhancement(skill_dir: str, enhance_level: int, args) -> None:
"""Run traditional SKILL.md enhancement with video-aware prompt.
This calls the same SkillEnhancer used by other scrapers, but the prompt
auto-detects video_tutorial source type and uses a video-specific prompt.
"""
import os
import subprocess
has_api_key = bool(
os.environ.get("ANTHROPIC_API_KEY")
or os.environ.get("ANTHROPIC_AUTH_TOKEN")
or getattr(args, "api_key", None)
)
if not has_api_key:
logger.info("\n💡 Enhance your video skill with AI:")
logger.info(f" export ANTHROPIC_API_KEY=sk-ant-...")
logger.info(f" skill-seekers enhance {skill_dir} --enhance-level {enhance_level}")
return
logger.info(f"\n🤖 Running video-aware SKILL.md enhancement (level {enhance_level})...")
try:
enhance_cmd = ["skill-seekers-enhance", skill_dir]
enhance_cmd.extend(["--enhance-level", str(enhance_level)])
api_key = getattr(args, "api_key", None)
if api_key:
enhance_cmd.extend(["--api-key", api_key])
result = subprocess.run(enhance_cmd, check=True)
if result.returncode == 0:
logger.info("✅ Video skill enhancement complete!")
except subprocess.CalledProcessError:
logger.warning("⚠ Enhancement failed, but skill was still built")
except FileNotFoundError:
logger.warning("⚠ skill-seekers-enhance not found. Run manually:")
logger.info(f" skill-seekers enhance {skill_dir} --enhance-level {enhance_level}")
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,218 @@
"""Video segmentation module.
Aligns transcript + metadata into VideoSegment objects using:
1. Chapter-based segmentation (primary — uses YouTube chapters)
2. Time-window segmentation (fallback — fixed-duration windows)
"""
import logging
from skill_seekers.cli.video_models import (
SegmentContentType,
TranscriptSegment,
VideoInfo,
VideoSegment,
VideoSourceConfig,
)
logger = logging.getLogger(__name__)
def _classify_content_type(transcript: str) -> SegmentContentType:
"""Classify segment content type based on transcript text."""
lower = transcript.lower()
code_indicators = ["import ", "def ", "class ", "function ", "const ", "npm ", "pip ", "git "]
intro_indicators = ["welcome", "hello", "today we", "in this video", "let's get started"]
outro_indicators = ["thanks for watching", "subscribe", "see you next", "that's it for"]
if any(kw in lower for kw in outro_indicators):
return SegmentContentType.OUTRO
if any(kw in lower for kw in intro_indicators):
return SegmentContentType.INTRO
if sum(1 for kw in code_indicators if kw in lower) >= 2:
return SegmentContentType.LIVE_CODING
return SegmentContentType.EXPLANATION
def _build_segment_content(
transcript: str,
chapter_title: str | None,
start_time: float,
end_time: float,
) -> str:
"""Build merged content string for a segment."""
parts = []
# Add chapter heading
start_min, start_sec = divmod(int(start_time), 60)
end_min, end_sec = divmod(int(end_time), 60)
ts = f"{start_min:02d}:{start_sec:02d} - {end_min:02d}:{end_sec:02d}"
if chapter_title:
parts.append(f"### {chapter_title} ({ts})\n")
else:
parts.append(f"### Segment ({ts})\n")
if transcript:
parts.append(transcript)
return "\n".join(parts)
def _get_transcript_in_range(
transcript_segments: list[TranscriptSegment],
start_time: float,
end_time: float,
) -> tuple[str, float]:
"""Get concatenated transcript text and average confidence for a time range.
Returns:
Tuple of (text, avg_confidence).
"""
texts = []
confidences = []
for seg in transcript_segments:
# Check overlap: segment overlaps with time range
if seg.end > start_time and seg.start < end_time:
texts.append(seg.text)
confidences.append(seg.confidence)
text = " ".join(texts)
avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0
return text, avg_confidence
def segment_by_chapters(
video_info: VideoInfo,
transcript_segments: list[TranscriptSegment],
) -> list[VideoSegment]:
"""Segment video using YouTube chapter boundaries.
Args:
video_info: Video metadata with chapters.
transcript_segments: Raw transcript segments.
Returns:
List of VideoSegment objects aligned to chapters.
"""
segments = []
for i, chapter in enumerate(video_info.chapters):
transcript, confidence = _get_transcript_in_range(
transcript_segments, chapter.start_time, chapter.end_time
)
content_type = _classify_content_type(transcript)
content = _build_segment_content(
transcript, chapter.title, chapter.start_time, chapter.end_time
)
segments.append(
VideoSegment(
index=i,
start_time=chapter.start_time,
end_time=chapter.end_time,
duration=chapter.end_time - chapter.start_time,
transcript=transcript,
transcript_confidence=confidence,
chapter_title=chapter.title,
content=content,
confidence=confidence,
content_type=content_type,
)
)
return segments
def segment_by_time_window(
video_info: VideoInfo,
transcript_segments: list[TranscriptSegment],
window_seconds: float = 120.0,
) -> list[VideoSegment]:
"""Segment video using fixed time windows.
Args:
video_info: Video metadata.
transcript_segments: Raw transcript segments.
window_seconds: Duration of each window in seconds.
Returns:
List of VideoSegment objects.
"""
segments = []
duration = video_info.duration
if duration <= 0 and transcript_segments:
duration = max(seg.end for seg in transcript_segments)
if duration <= 0:
return segments
current_time = 0.0
index = 0
while current_time < duration:
end_time = min(current_time + window_seconds, duration)
transcript, confidence = _get_transcript_in_range(
transcript_segments, current_time, end_time
)
if transcript.strip():
content_type = _classify_content_type(transcript)
content = _build_segment_content(transcript, None, current_time, end_time)
segments.append(
VideoSegment(
index=index,
start_time=current_time,
end_time=end_time,
duration=end_time - current_time,
transcript=transcript,
transcript_confidence=confidence,
content=content,
confidence=confidence,
content_type=content_type,
)
)
index += 1
current_time = end_time
return segments
def segment_video(
video_info: VideoInfo,
transcript_segments: list[TranscriptSegment],
config: VideoSourceConfig,
) -> list[VideoSegment]:
"""Segment a video using the best available strategy.
Priority:
1. Chapter-based (if chapters available)
2. Time-window fallback
Args:
video_info: Video metadata.
transcript_segments: Raw transcript segments.
config: Video source configuration.
Returns:
List of VideoSegment objects.
"""
# Use chapters if available
if video_info.chapters:
logger.info(f"Using chapter-based segmentation ({len(video_info.chapters)} chapters)")
segments = segment_by_chapters(video_info, transcript_segments)
if segments:
return segments
# Fallback to time-window
window = config.time_window_seconds
logger.info(f"Using time-window segmentation ({window}s windows)")
return segment_by_time_window(video_info, transcript_segments, window)

View File

@@ -0,0 +1,370 @@
"""Video transcript extraction module.
Handles all transcript acquisition:
- YouTube captions via youtube-transcript-api (Tier 1)
- Subtitle file parsing: SRT and VTT (Tier 1)
- Whisper ASR stub (Tier 2 — raises ImportError with install instructions)
"""
import logging
import re
from pathlib import Path
from skill_seekers.cli.video_models import (
TranscriptSegment,
TranscriptSource,
VideoInfo,
VideoSourceConfig,
VideoSourceType,
)
logger = logging.getLogger(__name__)
# Optional dependency: youtube-transcript-api
try:
from youtube_transcript_api import YouTubeTranscriptApi
HAS_YOUTUBE_TRANSCRIPT = True
except ImportError:
HAS_YOUTUBE_TRANSCRIPT = False
# Optional dependency: faster-whisper (Tier 2)
try:
from faster_whisper import WhisperModel # noqa: F401
HAS_WHISPER = True
except ImportError:
HAS_WHISPER = False
# =============================================================================
# YouTube Transcript Extraction (Tier 1)
# =============================================================================
def extract_youtube_transcript(
video_id: str,
languages: list[str] | None = None,
) -> tuple[list[TranscriptSegment], TranscriptSource]:
"""Fetch YouTube captions via youtube-transcript-api.
Args:
video_id: YouTube video ID (11 chars).
languages: Language preference list (e.g., ['en', 'tr']).
Returns:
Tuple of (transcript segments, source type).
Raises:
RuntimeError: If youtube-transcript-api is not installed.
"""
if not HAS_YOUTUBE_TRANSCRIPT:
raise RuntimeError(
"youtube-transcript-api is required for YouTube transcript extraction.\n"
'Install with: pip install "skill-seekers[video]"\n'
"Or: pip install youtube-transcript-api"
)
if languages is None:
languages = ["en"]
try:
ytt_api = YouTubeTranscriptApi()
transcript = ytt_api.fetch(video_id, languages=languages)
segments = []
source = TranscriptSource.YOUTUBE_MANUAL
for snippet in transcript.snippets:
text = snippet.text.strip()
if not text:
continue
start = snippet.start
duration = snippet.duration
segments.append(
TranscriptSegment(
text=text,
start=start,
end=start + duration,
confidence=1.0,
source=source,
)
)
if not segments:
return [], TranscriptSource.NONE
return segments, source
except Exception as e:
logger.warning(f"Failed to fetch YouTube transcript for {video_id}: {e}")
return [], TranscriptSource.NONE
# =============================================================================
# Subtitle File Parsing (Tier 1)
# =============================================================================
def _parse_timestamp_srt(ts: str) -> float:
"""Parse SRT timestamp (HH:MM:SS,mmm) to seconds."""
ts = ts.strip().replace(",", ".")
parts = ts.split(":")
if len(parts) == 3:
h, m, s = parts
return int(h) * 3600 + int(m) * 60 + float(s)
return 0.0
def _parse_timestamp_vtt(ts: str) -> float:
"""Parse VTT timestamp (HH:MM:SS.mmm or MM:SS.mmm) to seconds."""
ts = ts.strip()
parts = ts.split(":")
if len(parts) == 3:
h, m, s = parts
return int(h) * 3600 + int(m) * 60 + float(s)
elif len(parts) == 2:
m, s = parts
return int(m) * 60 + float(s)
return 0.0
def parse_srt(path: str) -> list[TranscriptSegment]:
"""Parse an SRT subtitle file into TranscriptSegments.
Args:
path: Path to .srt file.
Returns:
List of TranscriptSegment objects.
"""
content = Path(path).read_text(encoding="utf-8", errors="replace")
segments = []
# SRT format: index\nstart --> end\ntext\n\n
blocks = re.split(r"\n\s*\n", content.strip())
for block in blocks:
lines = block.strip().split("\n")
if len(lines) < 2:
continue
# Find the timestamp line (contains -->)
ts_line = None
text_lines = []
for line in lines:
if "-->" in line:
ts_line = line
elif ts_line is not None:
text_lines.append(line)
if ts_line is None:
continue
parts = ts_line.split("-->")
if len(parts) != 2:
continue
start = _parse_timestamp_srt(parts[0])
end = _parse_timestamp_srt(parts[1])
text = " ".join(text_lines).strip()
# Remove HTML tags
text = re.sub(r"<[^>]+>", "", text)
if text:
segments.append(
TranscriptSegment(
text=text,
start=start,
end=end,
confidence=1.0,
source=TranscriptSource.SUBTITLE_FILE,
)
)
return segments
def parse_vtt(path: str) -> list[TranscriptSegment]:
"""Parse a WebVTT subtitle file into TranscriptSegments.
Args:
path: Path to .vtt file.
Returns:
List of TranscriptSegment objects.
"""
content = Path(path).read_text(encoding="utf-8", errors="replace")
segments = []
# Skip VTT header
lines = content.strip().split("\n")
i = 0
# Skip WEBVTT header and any metadata
while i < len(lines) and not re.match(r"\d{2}:\d{2}", lines[i]):
i += 1
current_text_lines = []
current_start = 0.0
current_end = 0.0
in_cue = False
while i < len(lines):
line = lines[i].strip()
i += 1
if "-->" in line:
# Save previous cue
if in_cue and current_text_lines:
text = " ".join(current_text_lines).strip()
text = re.sub(r"<[^>]+>", "", text)
if text:
segments.append(
TranscriptSegment(
text=text,
start=current_start,
end=current_end,
confidence=1.0,
source=TranscriptSource.SUBTITLE_FILE,
)
)
parts = line.split("-->")
current_start = _parse_timestamp_vtt(parts[0])
current_end = _parse_timestamp_vtt(parts[1].split()[0])
current_text_lines = []
in_cue = True
elif line == "":
if in_cue and current_text_lines:
text = " ".join(current_text_lines).strip()
text = re.sub(r"<[^>]+>", "", text)
if text:
segments.append(
TranscriptSegment(
text=text,
start=current_start,
end=current_end,
confidence=1.0,
source=TranscriptSource.SUBTITLE_FILE,
)
)
current_text_lines = []
in_cue = False
elif in_cue:
# Skip cue identifiers (numeric lines before timestamps)
if not line.isdigit():
current_text_lines.append(line)
# Handle last cue
if in_cue and current_text_lines:
text = " ".join(current_text_lines).strip()
text = re.sub(r"<[^>]+>", "", text)
if text:
segments.append(
TranscriptSegment(
text=text,
start=current_start,
end=current_end,
confidence=1.0,
source=TranscriptSource.SUBTITLE_FILE,
)
)
return segments
# =============================================================================
# Whisper Stub (Tier 2)
# =============================================================================
def transcribe_with_whisper(
audio_path: str, # noqa: ARG001
model: str = "base", # noqa: ARG001
language: str | None = None, # noqa: ARG001
) -> list[TranscriptSegment]:
"""Transcribe audio using faster-whisper (Tier 2).
Raises:
RuntimeError: Always, unless faster-whisper is installed.
"""
if not HAS_WHISPER:
raise RuntimeError(
"faster-whisper is required for Whisper transcription.\n"
'Install with: pip install "skill-seekers[video-full]"\n'
"Or: pip install faster-whisper"
)
# Tier 2 implementation placeholder
raise NotImplementedError("Whisper transcription will be implemented in Tier 2")
# =============================================================================
# Main Entry Point
# =============================================================================
def get_transcript(
video_info: VideoInfo,
config: VideoSourceConfig,
) -> tuple[list[TranscriptSegment], TranscriptSource]:
"""Get transcript for a video, trying available methods in priority order.
Priority:
1. YouTube API (for YouTube videos)
2. Subtitle files (SRT/VTT alongside local files)
3. Whisper fallback (Tier 2)
4. NONE (no transcript available)
Args:
video_info: Video metadata.
config: Video source configuration.
Returns:
Tuple of (transcript segments, source type).
"""
languages = config.languages or ["en"]
# 1. Try YouTube API for YouTube videos
if video_info.source_type == VideoSourceType.YOUTUBE and HAS_YOUTUBE_TRANSCRIPT:
try:
segments, source = extract_youtube_transcript(video_info.video_id, languages)
if segments:
logger.info(
f"Got {len(segments)} transcript segments via YouTube API "
f"({source.value}) for '{video_info.title}'"
)
return segments, source
except Exception as e:
logger.warning(f"YouTube transcript failed: {e}")
# 2. Try subtitle files for local videos
if video_info.file_path:
base = Path(video_info.file_path).stem
parent = Path(video_info.file_path).parent
for ext in [".srt", ".vtt"]:
sub_path = parent / f"{base}{ext}"
if sub_path.exists():
logger.info(f"Found subtitle file: {sub_path}")
segments = parse_srt(str(sub_path)) if ext == ".srt" else parse_vtt(str(sub_path))
if segments:
return segments, TranscriptSource.SUBTITLE_FILE
# 3. Whisper fallback (Tier 2 — only if installed)
if HAS_WHISPER and video_info.file_path:
try:
segments = transcribe_with_whisper(
video_info.file_path,
model=config.whisper_model,
language=languages[0] if languages else None,
)
if segments:
return segments, TranscriptSource.WHISPER
except (RuntimeError, NotImplementedError):
pass
# 4. No transcript available
logger.warning(f"No transcript available for '{video_info.title}'")
return [], TranscriptSource.NONE

File diff suppressed because it is too large Load Diff