Restructure skill to follow Progressive Disclosure Architecture: Structure Changes: - Move Python scripts to scripts/ directory - Move sample JSON files to assets/ directory - Create references/ directory with extracted content - Remove redundant HOW_TO_USE.md and README.md New Reference Files: - references/metrics.md: Detailed scoring algorithms and formulas - references/examples.md: Concrete input/output examples - references/workflows.md: Step-by-step evaluation workflows SKILL.md Improvements: - Reduced from 430 lines to ~180 lines - Added table of contents - Added trigger phrases in description - Consistent imperative voice - Points to references for details Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
431 lines
12 KiB
Python
431 lines
12 KiB
Python
"""
|
|
Input Format Detector.
|
|
|
|
Automatically detects input format (text, YAML, JSON, URLs) and parses
|
|
accordingly for technology stack evaluation requests.
|
|
"""
|
|
|
|
from typing import Dict, Any, Optional, Tuple
|
|
import json
|
|
import re
|
|
|
|
|
|
class FormatDetector:
|
|
"""Detect and parse various input formats for stack evaluation."""
|
|
|
|
def __init__(self, input_data: str):
|
|
"""
|
|
Initialize format detector with raw input.
|
|
|
|
Args:
|
|
input_data: Raw input string from user
|
|
"""
|
|
self.raw_input = input_data.strip()
|
|
self.detected_format = None
|
|
self.parsed_data = None
|
|
|
|
def detect_format(self) -> str:
|
|
"""
|
|
Detect the input format.
|
|
|
|
Returns:
|
|
Format type: 'json', 'yaml', 'url', 'text'
|
|
"""
|
|
# Try JSON first
|
|
if self._is_json():
|
|
self.detected_format = 'json'
|
|
return 'json'
|
|
|
|
# Try YAML
|
|
if self._is_yaml():
|
|
self.detected_format = 'yaml'
|
|
return 'yaml'
|
|
|
|
# Check for URLs
|
|
if self._contains_urls():
|
|
self.detected_format = 'url'
|
|
return 'url'
|
|
|
|
# Default to conversational text
|
|
self.detected_format = 'text'
|
|
return 'text'
|
|
|
|
def _is_json(self) -> bool:
|
|
"""Check if input is valid JSON."""
|
|
try:
|
|
json.loads(self.raw_input)
|
|
return True
|
|
except (json.JSONDecodeError, ValueError):
|
|
return False
|
|
|
|
def _is_yaml(self) -> bool:
|
|
"""
|
|
Check if input looks like YAML.
|
|
|
|
Returns:
|
|
True if input appears to be YAML format
|
|
"""
|
|
# YAML indicators
|
|
yaml_patterns = [
|
|
r'^\s*[\w\-]+\s*:', # Key-value pairs
|
|
r'^\s*-\s+', # List items
|
|
r':\s*$', # Trailing colons
|
|
]
|
|
|
|
# Must not be JSON
|
|
if self._is_json():
|
|
return False
|
|
|
|
# Check for YAML patterns
|
|
lines = self.raw_input.split('\n')
|
|
yaml_line_count = 0
|
|
|
|
for line in lines:
|
|
for pattern in yaml_patterns:
|
|
if re.match(pattern, line):
|
|
yaml_line_count += 1
|
|
break
|
|
|
|
# If >50% of lines match YAML patterns, consider it YAML
|
|
if len(lines) > 0 and yaml_line_count / len(lines) > 0.5:
|
|
return True
|
|
|
|
return False
|
|
|
|
def _contains_urls(self) -> bool:
|
|
"""Check if input contains URLs."""
|
|
url_pattern = r'https?://[^\s]+'
|
|
return bool(re.search(url_pattern, self.raw_input))
|
|
|
|
def parse(self) -> Dict[str, Any]:
|
|
"""
|
|
Parse input based on detected format.
|
|
|
|
Returns:
|
|
Parsed data dictionary
|
|
"""
|
|
if self.detected_format is None:
|
|
self.detect_format()
|
|
|
|
if self.detected_format == 'json':
|
|
self.parsed_data = self._parse_json()
|
|
elif self.detected_format == 'yaml':
|
|
self.parsed_data = self._parse_yaml()
|
|
elif self.detected_format == 'url':
|
|
self.parsed_data = self._parse_urls()
|
|
else: # text
|
|
self.parsed_data = self._parse_text()
|
|
|
|
return self.parsed_data
|
|
|
|
def _parse_json(self) -> Dict[str, Any]:
|
|
"""Parse JSON input."""
|
|
try:
|
|
data = json.loads(self.raw_input)
|
|
return self._normalize_structure(data)
|
|
except json.JSONDecodeError:
|
|
return {'error': 'Invalid JSON', 'raw': self.raw_input}
|
|
|
|
def _parse_yaml(self) -> Dict[str, Any]:
|
|
"""
|
|
Parse YAML-like input (simplified, no external dependencies).
|
|
|
|
Returns:
|
|
Parsed dictionary
|
|
"""
|
|
result = {}
|
|
current_section = None
|
|
current_list = None
|
|
|
|
lines = self.raw_input.split('\n')
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith('#'):
|
|
continue
|
|
|
|
# Key-value pair
|
|
if ':' in stripped:
|
|
key, value = stripped.split(':', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
# Empty value might indicate nested structure
|
|
if not value:
|
|
current_section = key
|
|
result[current_section] = {}
|
|
current_list = None
|
|
else:
|
|
if current_section:
|
|
result[current_section][key] = self._parse_value(value)
|
|
else:
|
|
result[key] = self._parse_value(value)
|
|
|
|
# List item
|
|
elif stripped.startswith('-'):
|
|
item = stripped[1:].strip()
|
|
if current_section:
|
|
if current_list is None:
|
|
current_list = []
|
|
result[current_section] = current_list
|
|
current_list.append(self._parse_value(item))
|
|
|
|
return self._normalize_structure(result)
|
|
|
|
def _parse_value(self, value: str) -> Any:
|
|
"""
|
|
Parse a value string to appropriate type.
|
|
|
|
Args:
|
|
value: Value string
|
|
|
|
Returns:
|
|
Parsed value (str, int, float, bool)
|
|
"""
|
|
value = value.strip()
|
|
|
|
# Boolean
|
|
if value.lower() in ['true', 'yes']:
|
|
return True
|
|
if value.lower() in ['false', 'no']:
|
|
return False
|
|
|
|
# Number
|
|
try:
|
|
if '.' in value:
|
|
return float(value)
|
|
else:
|
|
return int(value)
|
|
except ValueError:
|
|
pass
|
|
|
|
# String (remove quotes if present)
|
|
if value.startswith('"') and value.endswith('"'):
|
|
return value[1:-1]
|
|
if value.startswith("'") and value.endswith("'"):
|
|
return value[1:-1]
|
|
|
|
return value
|
|
|
|
def _parse_urls(self) -> Dict[str, Any]:
|
|
"""Parse URLs from input."""
|
|
url_pattern = r'https?://[^\s]+'
|
|
urls = re.findall(url_pattern, self.raw_input)
|
|
|
|
# Categorize URLs
|
|
github_urls = [u for u in urls if 'github.com' in u]
|
|
npm_urls = [u for u in urls if 'npmjs.com' in u or 'npm.io' in u]
|
|
other_urls = [u for u in urls if u not in github_urls and u not in npm_urls]
|
|
|
|
# Also extract any text context
|
|
text_without_urls = re.sub(url_pattern, '', self.raw_input).strip()
|
|
|
|
result = {
|
|
'format': 'url',
|
|
'urls': {
|
|
'github': github_urls,
|
|
'npm': npm_urls,
|
|
'other': other_urls
|
|
},
|
|
'context': text_without_urls
|
|
}
|
|
|
|
return self._normalize_structure(result)
|
|
|
|
def _parse_text(self) -> Dict[str, Any]:
|
|
"""Parse conversational text input."""
|
|
text = self.raw_input.lower()
|
|
|
|
# Extract technologies being compared
|
|
technologies = self._extract_technologies(text)
|
|
|
|
# Extract use case
|
|
use_case = self._extract_use_case(text)
|
|
|
|
# Extract priorities
|
|
priorities = self._extract_priorities(text)
|
|
|
|
# Detect analysis type
|
|
analysis_type = self._detect_analysis_type(text)
|
|
|
|
result = {
|
|
'format': 'text',
|
|
'technologies': technologies,
|
|
'use_case': use_case,
|
|
'priorities': priorities,
|
|
'analysis_type': analysis_type,
|
|
'raw_text': self.raw_input
|
|
}
|
|
|
|
return self._normalize_structure(result)
|
|
|
|
def _extract_technologies(self, text: str) -> list:
|
|
"""
|
|
Extract technology names from text.
|
|
|
|
Args:
|
|
text: Lowercase text
|
|
|
|
Returns:
|
|
List of identified technologies
|
|
"""
|
|
# Common technologies pattern
|
|
tech_keywords = [
|
|
'react', 'vue', 'angular', 'svelte', 'next.js', 'nuxt.js',
|
|
'node.js', 'python', 'java', 'go', 'rust', 'ruby',
|
|
'postgresql', 'postgres', 'mysql', 'mongodb', 'redis',
|
|
'aws', 'azure', 'gcp', 'google cloud',
|
|
'docker', 'kubernetes', 'k8s',
|
|
'express', 'fastapi', 'django', 'flask', 'spring boot'
|
|
]
|
|
|
|
found = []
|
|
for tech in tech_keywords:
|
|
if tech in text:
|
|
# Normalize names
|
|
normalized = {
|
|
'postgres': 'PostgreSQL',
|
|
'next.js': 'Next.js',
|
|
'nuxt.js': 'Nuxt.js',
|
|
'node.js': 'Node.js',
|
|
'k8s': 'Kubernetes',
|
|
'gcp': 'Google Cloud Platform'
|
|
}.get(tech, tech.title())
|
|
|
|
if normalized not in found:
|
|
found.append(normalized)
|
|
|
|
return found if found else ['Unknown']
|
|
|
|
def _extract_use_case(self, text: str) -> str:
|
|
"""
|
|
Extract use case description from text.
|
|
|
|
Args:
|
|
text: Lowercase text
|
|
|
|
Returns:
|
|
Use case description
|
|
"""
|
|
use_case_keywords = {
|
|
'real-time': 'Real-time application',
|
|
'collaboration': 'Collaboration platform',
|
|
'saas': 'SaaS application',
|
|
'dashboard': 'Dashboard application',
|
|
'api': 'API-heavy application',
|
|
'data-intensive': 'Data-intensive application',
|
|
'e-commerce': 'E-commerce platform',
|
|
'enterprise': 'Enterprise application'
|
|
}
|
|
|
|
for keyword, description in use_case_keywords.items():
|
|
if keyword in text:
|
|
return description
|
|
|
|
return 'General purpose application'
|
|
|
|
def _extract_priorities(self, text: str) -> list:
|
|
"""
|
|
Extract priority criteria from text.
|
|
|
|
Args:
|
|
text: Lowercase text
|
|
|
|
Returns:
|
|
List of priorities
|
|
"""
|
|
priority_keywords = {
|
|
'performance': 'Performance',
|
|
'scalability': 'Scalability',
|
|
'developer experience': 'Developer experience',
|
|
'ecosystem': 'Ecosystem',
|
|
'learning curve': 'Learning curve',
|
|
'cost': 'Cost',
|
|
'security': 'Security',
|
|
'compliance': 'Compliance'
|
|
}
|
|
|
|
priorities = []
|
|
for keyword, priority in priority_keywords.items():
|
|
if keyword in text:
|
|
priorities.append(priority)
|
|
|
|
return priorities if priorities else ['Developer experience', 'Performance']
|
|
|
|
def _detect_analysis_type(self, text: str) -> str:
|
|
"""
|
|
Detect type of analysis requested.
|
|
|
|
Args:
|
|
text: Lowercase text
|
|
|
|
Returns:
|
|
Analysis type
|
|
"""
|
|
type_keywords = {
|
|
'migration': 'migration_analysis',
|
|
'migrate': 'migration_analysis',
|
|
'tco': 'tco_analysis',
|
|
'total cost': 'tco_analysis',
|
|
'security': 'security_analysis',
|
|
'compliance': 'security_analysis',
|
|
'compare': 'comparison',
|
|
'vs': 'comparison',
|
|
'evaluate': 'evaluation'
|
|
}
|
|
|
|
for keyword, analysis_type in type_keywords.items():
|
|
if keyword in text:
|
|
return analysis_type
|
|
|
|
return 'comparison' # Default
|
|
|
|
def _normalize_structure(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Normalize parsed data to standard structure.
|
|
|
|
Args:
|
|
data: Parsed data dictionary
|
|
|
|
Returns:
|
|
Normalized data structure
|
|
"""
|
|
# Ensure standard keys exist
|
|
standard_keys = [
|
|
'technologies',
|
|
'use_case',
|
|
'priorities',
|
|
'analysis_type',
|
|
'format'
|
|
]
|
|
|
|
normalized = data.copy()
|
|
|
|
for key in standard_keys:
|
|
if key not in normalized:
|
|
# Set defaults
|
|
defaults = {
|
|
'technologies': [],
|
|
'use_case': 'general',
|
|
'priorities': [],
|
|
'analysis_type': 'comparison',
|
|
'format': self.detected_format or 'unknown'
|
|
}
|
|
normalized[key] = defaults.get(key)
|
|
|
|
return normalized
|
|
|
|
def get_format_info(self) -> Dict[str, Any]:
|
|
"""
|
|
Get information about detected format.
|
|
|
|
Returns:
|
|
Format detection metadata
|
|
"""
|
|
return {
|
|
'detected_format': self.detected_format,
|
|
'input_length': len(self.raw_input),
|
|
'line_count': len(self.raw_input.split('\n')),
|
|
'parsing_successful': self.parsed_data is not None
|
|
}
|