Files
claude-skills-reference/engineering-team/senior-backend/scripts/database_migration_tool.py
2026-01-26 20:35:11 +01:00

859 lines
31 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Database Migration Tool
Analyzes SQL schema files, detects potential issues, suggests indexes,
and generates migration scripts with rollback support.
Usage:
python database_migration_tool.py schema.sql --analyze
python database_migration_tool.py old.sql --compare new.sql --output migrations/
python database_migration_tool.py schema.sql --suggest-indexes
"""
import os
import sys
import json
import argparse
import re
from pathlib import Path
from typing import Dict, List, Optional, Set, Tuple
from datetime import datetime
from dataclasses import dataclass, field, asdict
@dataclass
class Column:
"""Database column definition."""
name: str
data_type: str
nullable: bool = True
default: Optional[str] = None
primary_key: bool = False
unique: bool = False
references: Optional[str] = None
@dataclass
class Index:
"""Database index definition."""
name: str
table: str
columns: List[str]
unique: bool = False
partial: Optional[str] = None
@dataclass
class Table:
"""Database table definition."""
name: str
columns: Dict[str, Column] = field(default_factory=dict)
indexes: List[Index] = field(default_factory=list)
primary_key: List[str] = field(default_factory=list)
foreign_keys: List[Dict] = field(default_factory=list)
@dataclass
class Issue:
"""Schema issue or recommendation."""
severity: str # 'error', 'warning', 'info'
category: str # 'index', 'naming', 'type', 'constraint'
table: str
message: str
suggestion: Optional[str] = None
class SQLParser:
"""Parse SQL DDL statements."""
# Common patterns
CREATE_TABLE_PATTERN = re.compile(
r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?["`]?(\w+)["`]?\s*\((.*?)\)\s*;',
re.IGNORECASE | re.DOTALL
)
CREATE_INDEX_PATTERN = re.compile(
r'CREATE\s+(UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?["`]?(\w+)["`]?\s+'
r'ON\s+["`]?(\w+)["`]?\s*\(([^)]+)\)(?:\s+WHERE\s+(.+?))?;',
re.IGNORECASE | re.DOTALL
)
COLUMN_PATTERN = re.compile(
r'["`]?(\w+)["`]?\s+' # Column name
r'(\w+(?:\s*\([^)]+\))?)' # Data type
r'([^,]*)', # Constraints
re.IGNORECASE
)
FK_PATTERN = re.compile(
r'FOREIGN\s+KEY\s*\(["`]?(\w+)["`]?\)\s+'
r'REFERENCES\s+["`]?(\w+)["`]?\s*\(["`]?(\w+)["`]?\)',
re.IGNORECASE
)
def parse(self, sql: str) -> Dict[str, Table]:
"""Parse SQL and return table definitions."""
tables = {}
# Parse CREATE TABLE statements
for match in self.CREATE_TABLE_PATTERN.finditer(sql):
table_name = match.group(1)
body = match.group(2)
table = self._parse_table_body(table_name, body)
tables[table_name] = table
# Parse CREATE INDEX statements
for match in self.CREATE_INDEX_PATTERN.finditer(sql):
unique = bool(match.group(1))
index_name = match.group(2)
table_name = match.group(3)
columns = [c.strip().strip('"`') for c in match.group(4).split(',')]
where_clause = match.group(5)
index = Index(
name=index_name,
table=table_name,
columns=columns,
unique=unique,
partial=where_clause.strip() if where_clause else None
)
if table_name in tables:
tables[table_name].indexes.append(index)
return tables
def _parse_table_body(self, table_name: str, body: str) -> Table:
"""Parse table body (columns, constraints)."""
table = Table(name=table_name)
# Split by comma, but respect parentheses
parts = self._split_by_comma(body)
for part in parts:
part = part.strip()
# Skip empty parts
if not part:
continue
# Check for PRIMARY KEY constraint
if part.upper().startswith('PRIMARY KEY'):
pk_match = re.search(r'PRIMARY\s+KEY\s*\(([^)]+)\)', part, re.IGNORECASE)
if pk_match:
cols = [c.strip().strip('"`') for c in pk_match.group(1).split(',')]
table.primary_key = cols
# Check for FOREIGN KEY constraint
elif part.upper().startswith('FOREIGN KEY'):
fk_match = self.FK_PATTERN.search(part)
if fk_match:
table.foreign_keys.append({
'column': fk_match.group(1),
'ref_table': fk_match.group(2),
'ref_column': fk_match.group(3),
})
# Check for CONSTRAINT
elif part.upper().startswith('CONSTRAINT'):
# Handle named constraints
if 'PRIMARY KEY' in part.upper():
pk_match = re.search(r'PRIMARY\s+KEY\s*\(([^)]+)\)', part, re.IGNORECASE)
if pk_match:
cols = [c.strip().strip('"`') for c in pk_match.group(1).split(',')]
table.primary_key = cols
elif 'FOREIGN KEY' in part.upper():
fk_match = self.FK_PATTERN.search(part)
if fk_match:
table.foreign_keys.append({
'column': fk_match.group(1),
'ref_table': fk_match.group(2),
'ref_column': fk_match.group(3),
})
# Regular column definition
else:
col_match = self.COLUMN_PATTERN.match(part)
if col_match:
col_name = col_match.group(1)
col_type = col_match.group(2)
constraints = col_match.group(3).upper() if col_match.group(3) else ''
column = Column(
name=col_name,
data_type=col_type.upper(),
nullable='NOT NULL' not in constraints,
primary_key='PRIMARY KEY' in constraints,
unique='UNIQUE' in constraints,
)
# Extract default value
default_match = re.search(r'DEFAULT\s+(\S+)', constraints, re.IGNORECASE)
if default_match:
column.default = default_match.group(1)
# Extract references
ref_match = re.search(
r'REFERENCES\s+["`]?(\w+)["`]?\s*\(["`]?(\w+)["`]?\)',
constraints,
re.IGNORECASE
)
if ref_match:
column.references = f"{ref_match.group(1)}({ref_match.group(2)})"
table.foreign_keys.append({
'column': col_name,
'ref_table': ref_match.group(1),
'ref_column': ref_match.group(2),
})
if column.primary_key and col_name not in table.primary_key:
table.primary_key.append(col_name)
table.columns[col_name] = column
return table
def _split_by_comma(self, s: str) -> List[str]:
"""Split string by comma, respecting parentheses."""
parts = []
current = []
depth = 0
for char in s:
if char == '(':
depth += 1
elif char == ')':
depth -= 1
elif char == ',' and depth == 0:
parts.append(''.join(current))
current = []
continue
current.append(char)
if current:
parts.append(''.join(current))
return parts
class SchemaAnalyzer:
"""Analyze database schema for issues and optimizations."""
# Columns that typically need indexes (foreign keys)
FK_COLUMN_PATTERNS = ['_id', 'Id', '_ID']
# Columns that typically need indexes for filtering
FILTER_COLUMN_PATTERNS = ['status', 'state', 'type', 'category', 'active', 'enabled', 'deleted']
# Columns that typically need indexes for sorting/ordering
SORT_COLUMN_PATTERNS = ['created_at', 'updated_at', 'date', 'timestamp', 'order', 'position']
def __init__(self, tables: Dict[str, Table]):
self.tables = tables
self.issues: List[Issue] = []
def analyze(self) -> List[Issue]:
"""Run all analysis checks."""
self.issues = []
for table_name, table in self.tables.items():
self._check_naming_conventions(table)
self._check_primary_key(table)
self._check_foreign_key_indexes(table)
self._check_common_filter_columns(table)
self._check_timestamp_columns(table)
self._check_data_types(table)
return self.issues
def _check_naming_conventions(self, table: Table):
"""Check table and column naming conventions."""
# Table name should be lowercase
if table.name != table.name.lower():
self.issues.append(Issue(
severity='warning',
category='naming',
table=table.name,
message=f"Table name '{table.name}' should be lowercase",
suggestion=f"Rename to '{table.name.lower()}'"
))
# Table name should be plural (basic check)
if not table.name.endswith('s') and not table.name.endswith('es'):
self.issues.append(Issue(
severity='info',
category='naming',
table=table.name,
message=f"Table name '{table.name}' should typically be plural",
))
for col_name, col in table.columns.items():
# Column names should be lowercase with underscores
if col_name != col_name.lower():
self.issues.append(Issue(
severity='warning',
category='naming',
table=table.name,
message=f"Column '{col_name}' should use snake_case",
suggestion=f"Rename to '{self._to_snake_case(col_name)}'"
))
def _check_primary_key(self, table: Table):
"""Check for missing primary key."""
if not table.primary_key:
self.issues.append(Issue(
severity='error',
category='constraint',
table=table.name,
message=f"Table '{table.name}' has no primary key",
suggestion="Add a primary key column (e.g., 'id SERIAL PRIMARY KEY')"
))
def _check_foreign_key_indexes(self, table: Table):
"""Check that foreign key columns have indexes."""
indexed_columns = set()
for index in table.indexes:
indexed_columns.update(index.columns)
# Primary key columns are implicitly indexed
indexed_columns.update(table.primary_key)
for fk in table.foreign_keys:
fk_col = fk['column']
if fk_col not in indexed_columns:
self.issues.append(Issue(
severity='warning',
category='index',
table=table.name,
message=f"Foreign key column '{fk_col}' is not indexed",
suggestion=f"CREATE INDEX idx_{table.name}_{fk_col} ON {table.name}({fk_col});"
))
# Also check columns that look like foreign keys but aren't declared
for col_name in table.columns:
if any(col_name.endswith(pattern) for pattern in self.FK_COLUMN_PATTERNS):
if col_name not in indexed_columns:
# Check if it's actually a declared FK
is_declared_fk = any(fk['column'] == col_name for fk in table.foreign_keys)
if not is_declared_fk:
self.issues.append(Issue(
severity='info',
category='index',
table=table.name,
message=f"Column '{col_name}' looks like a foreign key but has no index",
suggestion=f"CREATE INDEX idx_{table.name}_{col_name} ON {table.name}({col_name});"
))
def _check_common_filter_columns(self, table: Table):
"""Check for indexes on commonly filtered columns."""
indexed_columns = set()
for index in table.indexes:
indexed_columns.update(index.columns)
indexed_columns.update(table.primary_key)
for col_name in table.columns:
col_lower = col_name.lower()
if any(pattern in col_lower for pattern in self.FILTER_COLUMN_PATTERNS):
if col_name not in indexed_columns:
self.issues.append(Issue(
severity='info',
category='index',
table=table.name,
message=f"Column '{col_name}' is commonly used for filtering but has no index",
suggestion=f"CREATE INDEX idx_{table.name}_{col_name} ON {table.name}({col_name});"
))
def _check_timestamp_columns(self, table: Table):
"""Check for indexes on timestamp columns used for sorting."""
has_created_at = 'created_at' in table.columns
has_updated_at = 'updated_at' in table.columns
if not has_created_at:
self.issues.append(Issue(
severity='info',
category='convention',
table=table.name,
message=f"Table '{table.name}' has no 'created_at' column",
suggestion="Consider adding: created_at TIMESTAMP DEFAULT NOW()"
))
if not has_updated_at:
self.issues.append(Issue(
severity='info',
category='convention',
table=table.name,
message=f"Table '{table.name}' has no 'updated_at' column",
suggestion="Consider adding: updated_at TIMESTAMP DEFAULT NOW()"
))
def _check_data_types(self, table: Table):
"""Check for potential data type issues."""
for col_name, col in table.columns.items():
dtype = col.data_type.upper()
# Check for VARCHAR without length
if 'VARCHAR' in dtype and '(' not in dtype:
self.issues.append(Issue(
severity='warning',
category='type',
table=table.name,
message=f"Column '{col_name}' uses VARCHAR without length",
suggestion="Specify a maximum length, e.g., VARCHAR(255)"
))
# Check for FLOAT/DOUBLE for monetary values
if 'FLOAT' in dtype or 'DOUBLE' in dtype:
if 'price' in col_name.lower() or 'amount' in col_name.lower() or 'total' in col_name.lower():
self.issues.append(Issue(
severity='warning',
category='type',
table=table.name,
message=f"Column '{col_name}' uses floating point for monetary value",
suggestion="Use DECIMAL or NUMERIC for monetary values"
))
# Check for TEXT columns that might benefit from length limits
if dtype == 'TEXT':
if 'email' in col_name.lower() or 'url' in col_name.lower():
self.issues.append(Issue(
severity='info',
category='type',
table=table.name,
message=f"Column '{col_name}' uses TEXT but might benefit from VARCHAR",
suggestion=f"Consider VARCHAR(255) for {col_name}"
))
def _to_snake_case(self, name: str) -> str:
"""Convert name to snake_case."""
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
class MigrationGenerator:
"""Generate migration scripts from schema differences."""
def __init__(self, old_tables: Dict[str, Table], new_tables: Dict[str, Table]):
self.old_tables = old_tables
self.new_tables = new_tables
def generate(self) -> Tuple[str, str]:
"""Generate UP and DOWN migration scripts."""
up_statements = []
down_statements = []
# Find new tables
for table_name, table in self.new_tables.items():
if table_name not in self.old_tables:
up_statements.append(self._generate_create_table(table))
down_statements.append(f"DROP TABLE IF EXISTS {table_name};")
# Find removed tables
for table_name, table in self.old_tables.items():
if table_name not in self.new_tables:
up_statements.append(f"DROP TABLE IF EXISTS {table_name};")
down_statements.append(self._generate_create_table(table))
# Find modified tables
for table_name in set(self.old_tables.keys()) & set(self.new_tables.keys()):
old_table = self.old_tables[table_name]
new_table = self.new_tables[table_name]
up, down = self._compare_tables(old_table, new_table)
up_statements.extend(up)
down_statements.extend(down)
up_sql = '\n\n'.join(up_statements) if up_statements else '-- No changes'
down_sql = '\n\n'.join(down_statements) if down_statements else '-- No changes'
return up_sql, down_sql
def _generate_create_table(self, table: Table) -> str:
"""Generate CREATE TABLE statement."""
lines = [f"CREATE TABLE {table.name} ("]
col_defs = []
for col_name, col in table.columns.items():
col_def = f" {col_name} {col.data_type}"
if not col.nullable:
col_def += " NOT NULL"
if col.default:
col_def += f" DEFAULT {col.default}"
if col.primary_key and len(table.primary_key) == 1:
col_def += " PRIMARY KEY"
if col.unique:
col_def += " UNIQUE"
col_defs.append(col_def)
# Add composite primary key
if len(table.primary_key) > 1:
pk_cols = ', '.join(table.primary_key)
col_defs.append(f" PRIMARY KEY ({pk_cols})")
# Add foreign keys
for fk in table.foreign_keys:
col_defs.append(
f" FOREIGN KEY ({fk['column']}) REFERENCES {fk['ref_table']}({fk['ref_column']})"
)
lines.append(',\n'.join(col_defs))
lines.append(");")
return '\n'.join(lines)
def _compare_tables(self, old: Table, new: Table) -> Tuple[List[str], List[str]]:
"""Compare two tables and generate ALTER statements."""
up = []
down = []
# New columns
for col_name, col in new.columns.items():
if col_name not in old.columns:
up.append(f"ALTER TABLE {new.name} ADD COLUMN {col_name} {col.data_type}"
+ (" NOT NULL" if not col.nullable else "")
+ (f" DEFAULT {col.default}" if col.default else "") + ";")
down.append(f"ALTER TABLE {new.name} DROP COLUMN IF EXISTS {col_name};")
# Removed columns
for col_name, col in old.columns.items():
if col_name not in new.columns:
up.append(f"ALTER TABLE {old.name} DROP COLUMN IF EXISTS {col_name};")
down.append(f"ALTER TABLE {old.name} ADD COLUMN {col_name} {col.data_type}"
+ (" NOT NULL" if not col.nullable else "")
+ (f" DEFAULT {col.default}" if col.default else "") + ";")
# Modified columns (type changes)
for col_name in set(old.columns.keys()) & set(new.columns.keys()):
old_col = old.columns[col_name]
new_col = new.columns[col_name]
if old_col.data_type != new_col.data_type:
up.append(f"ALTER TABLE {new.name} ALTER COLUMN {col_name} TYPE {new_col.data_type};")
down.append(f"ALTER TABLE {old.name} ALTER COLUMN {col_name} TYPE {old_col.data_type};")
# New indexes
old_index_names = {idx.name for idx in old.indexes}
for idx in new.indexes:
if idx.name not in old_index_names:
unique = "UNIQUE " if idx.unique else ""
cols = ', '.join(idx.columns)
where = f" WHERE {idx.partial}" if idx.partial else ""
up.append(f"CREATE {unique}INDEX CONCURRENTLY {idx.name} ON {idx.table}({cols}){where};")
down.append(f"DROP INDEX IF EXISTS {idx.name};")
# Removed indexes
new_index_names = {idx.name for idx in new.indexes}
for idx in old.indexes:
if idx.name not in new_index_names:
unique = "UNIQUE " if idx.unique else ""
cols = ', '.join(idx.columns)
where = f" WHERE {idx.partial}" if idx.partial else ""
up.append(f"DROP INDEX IF EXISTS {idx.name};")
down.append(f"CREATE {unique}INDEX {idx.name} ON {idx.table}({cols}){where};")
return up, down
class DatabaseMigrationTool:
"""Main tool for database migration analysis."""
def __init__(self, schema_path: str, compare_path: Optional[str] = None,
output_dir: Optional[str] = None, verbose: bool = False):
self.schema_path = Path(schema_path)
self.compare_path = Path(compare_path) if compare_path else None
self.output_dir = Path(output_dir) if output_dir else None
self.verbose = verbose
self.parser = SQLParser()
def run(self, mode: str = 'analyze') -> Dict:
"""Execute the tool in specified mode."""
print(f"Database Migration Tool")
print(f"Schema: {self.schema_path}")
print("-" * 50)
if not self.schema_path.exists():
raise FileNotFoundError(f"Schema file not found: {self.schema_path}")
schema_sql = self.schema_path.read_text()
tables = self.parser.parse(schema_sql)
if self.verbose:
print(f"Parsed {len(tables)} tables")
if mode == 'analyze':
return self._analyze(tables)
elif mode == 'compare':
return self._compare(tables)
elif mode == 'suggest-indexes':
return self._suggest_indexes(tables)
else:
raise ValueError(f"Unknown mode: {mode}")
def _analyze(self, tables: Dict[str, Table]) -> Dict:
"""Analyze schema for issues."""
analyzer = SchemaAnalyzer(tables)
issues = analyzer.analyze()
# Group by severity
errors = [i for i in issues if i.severity == 'error']
warnings = [i for i in issues if i.severity == 'warning']
infos = [i for i in issues if i.severity == 'info']
print(f"\nAnalysis Results:")
print(f" Tables: {len(tables)}")
print(f" Errors: {len(errors)}")
print(f" Warnings: {len(warnings)}")
print(f" Suggestions: {len(infos)}")
if errors:
print(f"\nERRORS:")
for issue in errors:
print(f" [{issue.table}] {issue.message}")
if issue.suggestion:
print(f" Suggestion: {issue.suggestion}")
if warnings:
print(f"\nWARNINGS:")
for issue in warnings:
print(f" [{issue.table}] {issue.message}")
if issue.suggestion:
print(f" Suggestion: {issue.suggestion}")
if self.verbose and infos:
print(f"\nSUGGESTIONS:")
for issue in infos:
print(f" [{issue.table}] {issue.message}")
if issue.suggestion:
print(f" {issue.suggestion}")
return {
'status': 'success',
'tables_count': len(tables),
'issues': {
'errors': len(errors),
'warnings': len(warnings),
'suggestions': len(infos),
},
'issues_detail': [asdict(i) for i in issues],
}
def _compare(self, old_tables: Dict[str, Table]) -> Dict:
"""Compare two schemas and generate migration."""
if not self.compare_path:
raise ValueError("Compare path required for compare mode")
if not self.compare_path.exists():
raise FileNotFoundError(f"Compare file not found: {self.compare_path}")
new_sql = self.compare_path.read_text()
new_tables = self.parser.parse(new_sql)
generator = MigrationGenerator(old_tables, new_tables)
up_sql, down_sql = generator.generate()
print(f"\nComparing schemas:")
print(f" Old: {self.schema_path}")
print(f" New: {self.compare_path}")
# Calculate changes
added_tables = set(new_tables.keys()) - set(old_tables.keys())
removed_tables = set(old_tables.keys()) - set(new_tables.keys())
print(f"\nChanges detected:")
print(f" Added tables: {len(added_tables)}")
print(f" Removed tables: {len(removed_tables)}")
if self.output_dir:
self.output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
up_file = self.output_dir / f"{timestamp}_migration.sql"
down_file = self.output_dir / f"{timestamp}_migration_rollback.sql"
up_file.write_text(f"-- Migration: {self.schema_path} -> {self.compare_path}\n"
f"-- Generated: {datetime.now().isoformat()}\n\n"
f"BEGIN;\n\n{up_sql}\n\nCOMMIT;\n")
down_file.write_text(f"-- Rollback for migration {timestamp}\n"
f"-- Generated: {datetime.now().isoformat()}\n\n"
f"BEGIN;\n\n{down_sql}\n\nCOMMIT;\n")
print(f"\nGenerated files:")
print(f" Migration: {up_file}")
print(f" Rollback: {down_file}")
else:
print(f"\n--- UP MIGRATION ---")
print(up_sql)
print(f"\n--- DOWN MIGRATION ---")
print(down_sql)
return {
'status': 'success',
'added_tables': list(added_tables),
'removed_tables': list(removed_tables),
'up_sql': up_sql,
'down_sql': down_sql,
}
def _suggest_indexes(self, tables: Dict[str, Table]) -> Dict:
"""Generate index suggestions."""
suggestions = []
for table_name, table in tables.items():
# Get existing indexed columns
indexed = set()
for idx in table.indexes:
indexed.update(idx.columns)
indexed.update(table.primary_key)
# Suggest indexes for foreign keys
for fk in table.foreign_keys:
if fk['column'] not in indexed:
suggestions.append({
'table': table_name,
'column': fk['column'],
'reason': 'Foreign key',
'sql': f"CREATE INDEX idx_{table_name}_{fk['column']} ON {table_name}({fk['column']});"
})
# Suggest indexes for common patterns
for col_name in table.columns:
if col_name in indexed:
continue
col_lower = col_name.lower()
# Foreign key pattern
if col_name.endswith('_id') and col_name not in indexed:
suggestions.append({
'table': table_name,
'column': col_name,
'reason': 'Likely foreign key',
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name});"
})
# Status/type columns
elif col_lower in ['status', 'state', 'type', 'category']:
suggestions.append({
'table': table_name,
'column': col_name,
'reason': 'Common filter column',
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name});"
})
# Timestamp columns
elif col_lower in ['created_at', 'updated_at']:
suggestions.append({
'table': table_name,
'column': col_name,
'reason': 'Common sort column',
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name} DESC);"
})
print(f"\nIndex Suggestions ({len(suggestions)} found):")
for s in suggestions:
print(f"\n [{s['table']}.{s['column']}] {s['reason']}")
print(f" {s['sql']}")
if self.output_dir:
self.output_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = self.output_dir / f"{timestamp}_add_indexes.sql"
lines = [
f"-- Suggested indexes",
f"-- Generated: {datetime.now().isoformat()}",
"",
]
for s in suggestions:
lines.append(f"-- {s['table']}.{s['column']}: {s['reason']}")
lines.append(s['sql'])
lines.append("")
output_file.write_text('\n'.join(lines))
print(f"\nWritten to: {output_file}")
return {
'status': 'success',
'suggestions_count': len(suggestions),
'suggestions': suggestions,
}
def main():
"""CLI entry point."""
parser = argparse.ArgumentParser(
description='Analyze SQL schemas and generate migrations',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
%(prog)s schema.sql --analyze
%(prog)s old.sql --compare new.sql --output migrations/
%(prog)s schema.sql --suggest-indexes --output migrations/
'''
)
parser.add_argument(
'schema',
help='Path to SQL schema file'
)
parser.add_argument(
'--analyze',
action='store_true',
help='Analyze schema for issues and optimizations'
)
parser.add_argument(
'--compare',
metavar='FILE',
help='Compare with another schema file and generate migration'
)
parser.add_argument(
'--suggest-indexes',
action='store_true',
help='Generate index suggestions'
)
parser.add_argument(
'--output', '-o',
help='Output directory for generated files'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose output'
)
parser.add_argument(
'--json',
action='store_true',
help='Output results as JSON'
)
args = parser.parse_args()
# Determine mode
if args.compare:
mode = 'compare'
elif args.suggest_indexes:
mode = 'suggest-indexes'
else:
mode = 'analyze'
try:
tool = DatabaseMigrationTool(
schema_path=args.schema,
compare_path=args.compare,
output_dir=args.output,
verbose=args.verbose,
)
results = tool.run(mode=mode)
if args.json:
print(json.dumps(results, indent=2))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()