Files
2026-01-26 20:35:11 +01:00

646 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
"""
API Scaffolder
Generates Express.js route handlers, validation middleware, and TypeScript types
from OpenAPI specifications (YAML/JSON).
Usage:
python api_scaffolder.py openapi.yaml --output src/routes/
python api_scaffolder.py openapi.json --framework fastify --output src/
python api_scaffolder.py spec.yaml --types-only --output src/types/
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from typing import Dict, List, Optional, Any
from datetime import datetime
def load_yaml_as_json(content: str) -> Dict:
"""Parse YAML content without PyYAML dependency (basic subset)."""
lines = content.split('\n')
result = {}
stack = [(result, -1)]
current_key = None
in_array = False
array_indent = -1
for line in lines:
stripped = line.lstrip()
if not stripped or stripped.startswith('#'):
continue
indent = len(line) - len(stripped)
# Pop stack until we find the right level
while len(stack) > 1 and stack[-1][1] >= indent:
stack.pop()
current_obj = stack[-1][0]
if stripped.startswith('- '):
# Array item
value = stripped[2:].strip()
if isinstance(current_obj, list):
if ':' in value:
# Object in array
key, val = value.split(':', 1)
new_obj = {key.strip(): val.strip().strip('"').strip("'")}
current_obj.append(new_obj)
stack.append((new_obj, indent))
else:
current_obj.append(value.strip('"').strip("'"))
elif ':' in stripped:
key, value = stripped.split(':', 1)
key = key.strip()
value = value.strip()
if value == '':
# Check next line for array or object
new_obj = {}
current_obj[key] = new_obj
stack.append((new_obj, indent))
elif value.startswith('[') and value.endswith(']'):
# Inline array
items = value[1:-1].split(',')
current_obj[key] = [i.strip().strip('"').strip("'") for i in items if i.strip()]
else:
# Simple value
value = value.strip('"').strip("'")
if value.lower() == 'true':
value = True
elif value.lower() == 'false':
value = False
elif value.isdigit():
value = int(value)
current_obj[key] = value
return result
def load_spec(spec_path: Path) -> Dict:
"""Load OpenAPI spec from YAML or JSON file."""
content = spec_path.read_text()
if spec_path.suffix in ['.yaml', '.yml']:
try:
import yaml
return yaml.safe_load(content)
except ImportError:
# Fallback to basic YAML parser
return load_yaml_as_json(content)
else:
return json.loads(content)
def openapi_type_to_ts(schema: Dict) -> str:
"""Convert OpenAPI schema type to TypeScript type."""
if not schema:
return 'unknown'
if '$ref' in schema:
ref = schema['$ref']
return ref.split('/')[-1]
type_map = {
'string': 'string',
'integer': 'number',
'number': 'number',
'boolean': 'boolean',
'object': 'Record<string, unknown>',
'array': 'unknown[]',
}
schema_type = schema.get('type', 'unknown')
if schema_type == 'array':
items = schema.get('items', {})
item_type = openapi_type_to_ts(items)
return f'{item_type}[]'
if schema_type == 'object':
properties = schema.get('properties', {})
if properties:
props = []
required = schema.get('required', [])
for name, prop in properties.items():
ts_type = openapi_type_to_ts(prop)
optional = '?' if name not in required else ''
props.append(f' {name}{optional}: {ts_type};')
return '{\n' + '\n'.join(props) + '\n}'
return 'Record<string, unknown>'
if 'enum' in schema:
values = ' | '.join(f"'{v}'" for v in schema['enum'])
return values
return type_map.get(schema_type, 'unknown')
def generate_zod_schema(schema: Dict, name: str) -> str:
"""Generate Zod validation schema from OpenAPI schema."""
if not schema:
return f'export const {name}Schema = z.unknown();'
def schema_to_zod(s: Dict) -> str:
if '$ref' in s:
ref_name = s['$ref'].split('/')[-1]
return f'{ref_name}Schema'
s_type = s.get('type', 'unknown')
if s_type == 'string':
zod = 'z.string()'
if 'minLength' in s:
zod += f'.min({s["minLength"]})'
if 'maxLength' in s:
zod += f'.max({s["maxLength"]})'
if 'pattern' in s:
zod += f'.regex(/{s["pattern"]}/)'
if s.get('format') == 'email':
zod += '.email()'
if s.get('format') == 'uuid':
zod += '.uuid()'
if 'enum' in s:
values = ', '.join(f"'{v}'" for v in s['enum'])
return f'z.enum([{values}])'
return zod
if s_type == 'integer':
zod = 'z.number().int()'
if 'minimum' in s:
zod += f'.min({s["minimum"]})'
if 'maximum' in s:
zod += f'.max({s["maximum"]})'
return zod
if s_type == 'number':
zod = 'z.number()'
if 'minimum' in s:
zod += f'.min({s["minimum"]})'
if 'maximum' in s:
zod += f'.max({s["maximum"]})'
return zod
if s_type == 'boolean':
return 'z.boolean()'
if s_type == 'array':
items_zod = schema_to_zod(s.get('items', {}))
return f'z.array({items_zod})'
if s_type == 'object':
properties = s.get('properties', {})
required = s.get('required', [])
if not properties:
return 'z.record(z.unknown())'
props = []
for prop_name, prop_schema in properties.items():
prop_zod = schema_to_zod(prop_schema)
if prop_name not in required:
prop_zod += '.optional()'
props.append(f' {prop_name}: {prop_zod},')
return 'z.object({\n' + '\n'.join(props) + '\n})'
return 'z.unknown()'
return f'export const {name}Schema = {schema_to_zod(schema)};'
def to_camel_case(s: str) -> str:
"""Convert string to camelCase."""
s = re.sub(r'[^a-zA-Z0-9]', ' ', s)
words = s.split()
if not words:
return s
return words[0].lower() + ''.join(w.capitalize() for w in words[1:])
def to_pascal_case(s: str) -> str:
"""Convert string to PascalCase."""
s = re.sub(r'[^a-zA-Z0-9]', ' ', s)
return ''.join(w.capitalize() for w in s.split())
def extract_path_params(path: str) -> List[str]:
"""Extract path parameters from OpenAPI path."""
return re.findall(r'\{(\w+)\}', path)
def openapi_path_to_express(path: str) -> str:
"""Convert OpenAPI path to Express path format."""
return re.sub(r'\{(\w+)\}', r':\1', path)
class APIScaffolder:
"""Generate Express.js routes from OpenAPI specification."""
SUPPORTED_FRAMEWORKS = ['express', 'fastify', 'koa']
def __init__(self, spec_path: str, output_dir: str, framework: str = 'express',
types_only: bool = False, verbose: bool = False):
self.spec_path = Path(spec_path)
self.output_dir = Path(output_dir)
self.framework = framework
self.types_only = types_only
self.verbose = verbose
self.spec: Dict = {}
self.generated_files: List[str] = []
def run(self) -> Dict:
"""Execute scaffolding process."""
print(f"API Scaffolder - {self.framework.capitalize()}")
print(f"Spec: {self.spec_path}")
print(f"Output: {self.output_dir}")
print("-" * 50)
self.validate()
self.load_spec()
self.ensure_output_dir()
if self.types_only:
self.generate_types()
else:
self.generate_types()
self.generate_validators()
self.generate_routes()
self.generate_index()
return {
'status': 'success',
'spec': str(self.spec_path),
'output': str(self.output_dir),
'framework': self.framework,
'generated_files': self.generated_files,
'routes_count': len(self.get_operations()),
'types_count': len(self.get_schemas()),
}
def validate(self):
"""Validate inputs."""
if not self.spec_path.exists():
raise FileNotFoundError(f"Spec file not found: {self.spec_path}")
if self.framework not in self.SUPPORTED_FRAMEWORKS:
raise ValueError(f"Unsupported framework: {self.framework}")
def load_spec(self):
"""Load and parse OpenAPI specification."""
self.spec = load_spec(self.spec_path)
if self.verbose:
title = self.spec.get('info', {}).get('title', 'Unknown')
version = self.spec.get('info', {}).get('version', '0.0.0')
print(f"Loaded: {title} v{version}")
def ensure_output_dir(self):
"""Create output directory if needed."""
self.output_dir.mkdir(parents=True, exist_ok=True)
def get_schemas(self) -> Dict:
"""Get component schemas from spec."""
return self.spec.get('components', {}).get('schemas', {})
def get_operations(self) -> List[Dict]:
"""Extract all operations from spec."""
operations = []
paths = self.spec.get('paths', {})
for path, methods in paths.items():
if not isinstance(methods, dict):
continue
for method, details in methods.items():
if method.lower() not in ['get', 'post', 'put', 'patch', 'delete']:
continue
if not isinstance(details, dict):
continue
op_id = details.get('operationId', f'{method}_{path}'.replace('/', '_'))
operations.append({
'path': path,
'method': method.lower(),
'operation_id': op_id,
'summary': details.get('summary', ''),
'parameters': details.get('parameters', []),
'request_body': details.get('requestBody', {}),
'responses': details.get('responses', {}),
'tags': details.get('tags', ['default']),
})
return operations
def generate_types(self):
"""Generate TypeScript type definitions."""
schemas = self.get_schemas()
lines = [
'// Auto-generated TypeScript types',
f'// Generated from: {self.spec_path.name}',
f'// Date: {datetime.now().isoformat()}',
'',
]
for name, schema in schemas.items():
ts_type = openapi_type_to_ts(schema)
if ts_type.startswith('{'):
lines.append(f'export interface {name} {ts_type}')
else:
lines.append(f'export type {name} = {ts_type};')
lines.append('')
# Generate request/response types from operations
for op in self.get_operations():
op_name = to_pascal_case(op['operation_id'])
# Request body type
req_body = op.get('request_body', {})
if req_body:
content = req_body.get('content', {})
json_content = content.get('application/json', {})
schema = json_content.get('schema', {})
if schema and '$ref' not in schema:
ts_type = openapi_type_to_ts(schema)
lines.append(f'export interface {op_name}Request {ts_type}')
lines.append('')
# Response type (200 response)
responses = op.get('responses', {})
success_resp = responses.get('200', responses.get('201', {}))
if success_resp:
content = success_resp.get('content', {})
json_content = content.get('application/json', {})
schema = json_content.get('schema', {})
if schema and '$ref' not in schema:
ts_type = openapi_type_to_ts(schema)
lines.append(f'export interface {op_name}Response {ts_type}')
lines.append('')
types_file = self.output_dir / 'types.ts'
types_file.write_text('\n'.join(lines))
self.generated_files.append(str(types_file))
print(f" Generated: {types_file}")
def generate_validators(self):
"""Generate Zod validation schemas."""
schemas = self.get_schemas()
lines = [
"import { z } from 'zod';",
'',
'// Auto-generated Zod validation schemas',
f'// Generated from: {self.spec_path.name}',
'',
]
for name, schema in schemas.items():
zod_schema = generate_zod_schema(schema, name)
lines.append(zod_schema)
lines.append(f'export type {name} = z.infer<typeof {name}Schema>;')
lines.append('')
# Generate validation middleware
lines.extend([
'// Validation middleware factory',
'import { Request, Response, NextFunction } from "express";',
'',
'export function validate<T>(schema: z.ZodSchema<T>) {',
' return (req: Request, res: Response, next: NextFunction) => {',
' const result = schema.safeParse(req.body);',
' if (!result.success) {',
' return res.status(400).json({',
' error: {',
' code: "VALIDATION_ERROR",',
' message: "Request validation failed",',
' details: result.error.errors.map(e => ({',
' field: e.path.join("."),',
' message: e.message,',
' })),',
' },',
' });',
' }',
' req.body = result.data;',
' next();',
' };',
'}',
])
validators_file = self.output_dir / 'validators.ts'
validators_file.write_text('\n'.join(lines))
self.generated_files.append(str(validators_file))
print(f" Generated: {validators_file}")
def generate_routes(self):
"""Generate route handlers."""
operations = self.get_operations()
# Group by tag
routes_by_tag: Dict[str, List[Dict]] = {}
for op in operations:
tag = op['tags'][0] if op['tags'] else 'default'
if tag not in routes_by_tag:
routes_by_tag[tag] = []
routes_by_tag[tag].append(op)
# Generate a route file per tag
for tag, ops in routes_by_tag.items():
self.generate_route_file(tag, ops)
def generate_route_file(self, tag: str, operations: List[Dict]):
"""Generate a single route file."""
tag_name = to_camel_case(tag)
lines = [
"import { Router, Request, Response, NextFunction } from 'express';",
"import { validate } from './validators';",
"import * as schemas from './validators';",
'',
f'const router = Router();',
'',
]
for op in operations:
method = op['method']
path = openapi_path_to_express(op['path'])
handler_name = to_camel_case(op['operation_id'])
summary = op.get('summary', '')
# Check if has request body
req_body = op.get('request_body', {})
has_body = bool(req_body.get('content', {}).get('application/json'))
# Find schema reference
schema_ref = None
if has_body:
content = req_body.get('content', {}).get('application/json', {})
schema = content.get('schema', {})
if '$ref' in schema:
schema_ref = schema['$ref'].split('/')[-1]
lines.append(f'/**')
if summary:
lines.append(f' * {summary}')
lines.append(f' * {method.upper()} {op["path"]}')
lines.append(f' */')
middleware = ''
if schema_ref:
middleware = f'validate(schemas.{schema_ref}Schema), '
lines.append(f"router.{method}('{path}', {middleware}async (req: Request, res: Response, next: NextFunction) => {{")
lines.append(' try {')
# Extract path params
path_params = extract_path_params(op['path'])
if path_params:
lines.append(f" const {{ {', '.join(path_params)} }} = req.params;")
lines.append('')
lines.append(f' // TODO: Implement {handler_name}')
lines.append('')
# Default response based on method
if method == 'post':
lines.append(" res.status(201).json({ message: 'Created' });")
elif method == 'delete':
lines.append(" res.status(204).send();")
else:
lines.append(" res.json({ message: 'OK' });")
lines.append(' } catch (err) {')
lines.append(' next(err);')
lines.append(' }')
lines.append('});')
lines.append('')
lines.append(f'export default router;')
route_file = self.output_dir / f'{tag_name}.routes.ts'
route_file.write_text('\n'.join(lines))
self.generated_files.append(str(route_file))
print(f" Generated: {route_file} ({len(operations)} handlers)")
def generate_index(self):
"""Generate index file that combines all routes."""
operations = self.get_operations()
# Get unique tags
tags = set()
for op in operations:
tag = op['tags'][0] if op['tags'] else 'default'
tags.add(tag)
lines = [
"import { Router } from 'express';",
'',
]
for tag in sorted(tags):
tag_name = to_camel_case(tag)
lines.append(f"import {tag_name}Routes from './{tag_name}.routes';")
lines.extend([
'',
'const router = Router();',
'',
])
for tag in sorted(tags):
tag_name = to_camel_case(tag)
# Use tag as base path
base_path = '/' + tag.lower().replace(' ', '-')
lines.append(f"router.use('{base_path}', {tag_name}Routes);")
lines.extend([
'',
'export default router;',
])
index_file = self.output_dir / 'index.ts'
index_file.write_text('\n'.join(lines))
self.generated_files.append(str(index_file))
print(f" Generated: {index_file}")
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description='Generate Express.js routes from OpenAPI specification',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s openapi.yaml --output src/routes/
%(prog)s spec.json --framework fastify --output src/api/
%(prog)s openapi.yaml --types-only --output src/types/
'''
)
parser.add_argument(
'spec',
help='Path to OpenAPI specification (YAML or JSON)'
)
parser.add_argument(
'--output', '-o',
default='./generated',
help='Output directory (default: ./generated)'
)
parser.add_argument(
'--framework', '-f',
choices=['express', 'fastify', 'koa'],
default='express',
help='Target framework (default: express)'
)
parser.add_argument(
'--types-only',
action='store_true',
help='Generate only TypeScript types'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
args = parser.parse_args()
try:
scaffolder = APIScaffolder(
spec_path=args.spec,
output_dir=args.output,
framework=args.framework,
types_only=args.types_only,
verbose=args.verbose,
)
results = scaffolder.run()
print("-" * 50)
print(f"Generated {results['routes_count']} route handlers")
print(f"Generated {results['types_count']} type definitions")
print(f"Output: {results['output']}")
if args.json:
print(json.dumps(results, indent=2))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()