This commit is contained in:
@@ -1,81 +1,819 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Database Migration Tool
|
||||
Automated tool for senior backend tasks
|
||||
|
||||
Analyzes SQL schema files, detects potential issues, suggests indexes,
|
||||
and generates migration scripts with rollback support.
|
||||
|
||||
Usage:
|
||||
python database_migration_tool.py schema.sql --analyze
|
||||
python database_migration_tool.py old.sql --compare new.sql --output migrations/
|
||||
python database_migration_tool.py schema.sql --suggest-indexes
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional, Set, Tuple
|
||||
from datetime import datetime
|
||||
from dataclasses import dataclass, field, asdict
|
||||
|
||||
|
||||
@dataclass
|
||||
class Column:
|
||||
"""Database column definition."""
|
||||
name: str
|
||||
data_type: str
|
||||
nullable: bool = True
|
||||
default: Optional[str] = None
|
||||
primary_key: bool = False
|
||||
unique: bool = False
|
||||
references: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Index:
|
||||
"""Database index definition."""
|
||||
name: str
|
||||
table: str
|
||||
columns: List[str]
|
||||
unique: bool = False
|
||||
partial: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Table:
|
||||
"""Database table definition."""
|
||||
name: str
|
||||
columns: Dict[str, Column] = field(default_factory=dict)
|
||||
indexes: List[Index] = field(default_factory=list)
|
||||
primary_key: List[str] = field(default_factory=list)
|
||||
foreign_keys: List[Dict] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Issue:
|
||||
"""Schema issue or recommendation."""
|
||||
severity: str # 'error', 'warning', 'info'
|
||||
category: str # 'index', 'naming', 'type', 'constraint'
|
||||
table: str
|
||||
message: str
|
||||
suggestion: Optional[str] = None
|
||||
|
||||
|
||||
class SQLParser:
|
||||
"""Parse SQL DDL statements."""
|
||||
|
||||
# Common patterns
|
||||
CREATE_TABLE_PATTERN = re.compile(
|
||||
r'CREATE\s+TABLE\s+(?:IF\s+NOT\s+EXISTS\s+)?["`]?(\w+)["`]?\s*\((.*?)\)\s*;',
|
||||
re.IGNORECASE | re.DOTALL
|
||||
)
|
||||
|
||||
CREATE_INDEX_PATTERN = re.compile(
|
||||
r'CREATE\s+(UNIQUE\s+)?INDEX\s+(?:IF\s+NOT\s+EXISTS\s+)?["`]?(\w+)["`]?\s+'
|
||||
r'ON\s+["`]?(\w+)["`]?\s*\(([^)]+)\)(?:\s+WHERE\s+(.+?))?;',
|
||||
re.IGNORECASE | re.DOTALL
|
||||
)
|
||||
|
||||
COLUMN_PATTERN = re.compile(
|
||||
r'["`]?(\w+)["`]?\s+' # Column name
|
||||
r'(\w+(?:\s*\([^)]+\))?)' # Data type
|
||||
r'([^,]*)', # Constraints
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
FK_PATTERN = re.compile(
|
||||
r'FOREIGN\s+KEY\s*\(["`]?(\w+)["`]?\)\s+'
|
||||
r'REFERENCES\s+["`]?(\w+)["`]?\s*\(["`]?(\w+)["`]?\)',
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
def parse(self, sql: str) -> Dict[str, Table]:
|
||||
"""Parse SQL and return table definitions."""
|
||||
tables = {}
|
||||
|
||||
# Parse CREATE TABLE statements
|
||||
for match in self.CREATE_TABLE_PATTERN.finditer(sql):
|
||||
table_name = match.group(1)
|
||||
body = match.group(2)
|
||||
table = self._parse_table_body(table_name, body)
|
||||
tables[table_name] = table
|
||||
|
||||
# Parse CREATE INDEX statements
|
||||
for match in self.CREATE_INDEX_PATTERN.finditer(sql):
|
||||
unique = bool(match.group(1))
|
||||
index_name = match.group(2)
|
||||
table_name = match.group(3)
|
||||
columns = [c.strip().strip('"`') for c in match.group(4).split(',')]
|
||||
where_clause = match.group(5)
|
||||
|
||||
index = Index(
|
||||
name=index_name,
|
||||
table=table_name,
|
||||
columns=columns,
|
||||
unique=unique,
|
||||
partial=where_clause.strip() if where_clause else None
|
||||
)
|
||||
|
||||
if table_name in tables:
|
||||
tables[table_name].indexes.append(index)
|
||||
|
||||
return tables
|
||||
|
||||
def _parse_table_body(self, table_name: str, body: str) -> Table:
|
||||
"""Parse table body (columns, constraints)."""
|
||||
table = Table(name=table_name)
|
||||
|
||||
# Split by comma, but respect parentheses
|
||||
parts = self._split_by_comma(body)
|
||||
|
||||
for part in parts:
|
||||
part = part.strip()
|
||||
|
||||
# Skip empty parts
|
||||
if not part:
|
||||
continue
|
||||
|
||||
# Check for PRIMARY KEY constraint
|
||||
if part.upper().startswith('PRIMARY KEY'):
|
||||
pk_match = re.search(r'PRIMARY\s+KEY\s*\(([^)]+)\)', part, re.IGNORECASE)
|
||||
if pk_match:
|
||||
cols = [c.strip().strip('"`') for c in pk_match.group(1).split(',')]
|
||||
table.primary_key = cols
|
||||
|
||||
# Check for FOREIGN KEY constraint
|
||||
elif part.upper().startswith('FOREIGN KEY'):
|
||||
fk_match = self.FK_PATTERN.search(part)
|
||||
if fk_match:
|
||||
table.foreign_keys.append({
|
||||
'column': fk_match.group(1),
|
||||
'ref_table': fk_match.group(2),
|
||||
'ref_column': fk_match.group(3),
|
||||
})
|
||||
|
||||
# Check for CONSTRAINT
|
||||
elif part.upper().startswith('CONSTRAINT'):
|
||||
# Handle named constraints
|
||||
if 'PRIMARY KEY' in part.upper():
|
||||
pk_match = re.search(r'PRIMARY\s+KEY\s*\(([^)]+)\)', part, re.IGNORECASE)
|
||||
if pk_match:
|
||||
cols = [c.strip().strip('"`') for c in pk_match.group(1).split(',')]
|
||||
table.primary_key = cols
|
||||
elif 'FOREIGN KEY' in part.upper():
|
||||
fk_match = self.FK_PATTERN.search(part)
|
||||
if fk_match:
|
||||
table.foreign_keys.append({
|
||||
'column': fk_match.group(1),
|
||||
'ref_table': fk_match.group(2),
|
||||
'ref_column': fk_match.group(3),
|
||||
})
|
||||
|
||||
# Regular column definition
|
||||
else:
|
||||
col_match = self.COLUMN_PATTERN.match(part)
|
||||
if col_match:
|
||||
col_name = col_match.group(1)
|
||||
col_type = col_match.group(2)
|
||||
constraints = col_match.group(3).upper() if col_match.group(3) else ''
|
||||
|
||||
column = Column(
|
||||
name=col_name,
|
||||
data_type=col_type.upper(),
|
||||
nullable='NOT NULL' not in constraints,
|
||||
primary_key='PRIMARY KEY' in constraints,
|
||||
unique='UNIQUE' in constraints,
|
||||
)
|
||||
|
||||
# Extract default value
|
||||
default_match = re.search(r'DEFAULT\s+(\S+)', constraints, re.IGNORECASE)
|
||||
if default_match:
|
||||
column.default = default_match.group(1)
|
||||
|
||||
# Extract references
|
||||
ref_match = re.search(
|
||||
r'REFERENCES\s+["`]?(\w+)["`]?\s*\(["`]?(\w+)["`]?\)',
|
||||
constraints,
|
||||
re.IGNORECASE
|
||||
)
|
||||
if ref_match:
|
||||
column.references = f"{ref_match.group(1)}({ref_match.group(2)})"
|
||||
table.foreign_keys.append({
|
||||
'column': col_name,
|
||||
'ref_table': ref_match.group(1),
|
||||
'ref_column': ref_match.group(2),
|
||||
})
|
||||
|
||||
if column.primary_key and col_name not in table.primary_key:
|
||||
table.primary_key.append(col_name)
|
||||
|
||||
table.columns[col_name] = column
|
||||
|
||||
return table
|
||||
|
||||
def _split_by_comma(self, s: str) -> List[str]:
|
||||
"""Split string by comma, respecting parentheses."""
|
||||
parts = []
|
||||
current = []
|
||||
depth = 0
|
||||
|
||||
for char in s:
|
||||
if char == '(':
|
||||
depth += 1
|
||||
elif char == ')':
|
||||
depth -= 1
|
||||
elif char == ',' and depth == 0:
|
||||
parts.append(''.join(current))
|
||||
current = []
|
||||
continue
|
||||
current.append(char)
|
||||
|
||||
if current:
|
||||
parts.append(''.join(current))
|
||||
|
||||
return parts
|
||||
|
||||
|
||||
class SchemaAnalyzer:
|
||||
"""Analyze database schema for issues and optimizations."""
|
||||
|
||||
# Columns that typically need indexes (foreign keys)
|
||||
FK_COLUMN_PATTERNS = ['_id', 'Id', '_ID']
|
||||
|
||||
# Columns that typically need indexes for filtering
|
||||
FILTER_COLUMN_PATTERNS = ['status', 'state', 'type', 'category', 'active', 'enabled', 'deleted']
|
||||
|
||||
# Columns that typically need indexes for sorting/ordering
|
||||
SORT_COLUMN_PATTERNS = ['created_at', 'updated_at', 'date', 'timestamp', 'order', 'position']
|
||||
|
||||
def __init__(self, tables: Dict[str, Table]):
|
||||
self.tables = tables
|
||||
self.issues: List[Issue] = []
|
||||
|
||||
def analyze(self) -> List[Issue]:
|
||||
"""Run all analysis checks."""
|
||||
self.issues = []
|
||||
|
||||
for table_name, table in self.tables.items():
|
||||
self._check_naming_conventions(table)
|
||||
self._check_primary_key(table)
|
||||
self._check_foreign_key_indexes(table)
|
||||
self._check_common_filter_columns(table)
|
||||
self._check_timestamp_columns(table)
|
||||
self._check_data_types(table)
|
||||
|
||||
return self.issues
|
||||
|
||||
def _check_naming_conventions(self, table: Table):
|
||||
"""Check table and column naming conventions."""
|
||||
# Table name should be lowercase
|
||||
if table.name != table.name.lower():
|
||||
self.issues.append(Issue(
|
||||
severity='warning',
|
||||
category='naming',
|
||||
table=table.name,
|
||||
message=f"Table name '{table.name}' should be lowercase",
|
||||
suggestion=f"Rename to '{table.name.lower()}'"
|
||||
))
|
||||
|
||||
# Table name should be plural (basic check)
|
||||
if not table.name.endswith('s') and not table.name.endswith('es'):
|
||||
self.issues.append(Issue(
|
||||
severity='info',
|
||||
category='naming',
|
||||
table=table.name,
|
||||
message=f"Table name '{table.name}' should typically be plural",
|
||||
))
|
||||
|
||||
for col_name, col in table.columns.items():
|
||||
# Column names should be lowercase with underscores
|
||||
if col_name != col_name.lower():
|
||||
self.issues.append(Issue(
|
||||
severity='warning',
|
||||
category='naming',
|
||||
table=table.name,
|
||||
message=f"Column '{col_name}' should use snake_case",
|
||||
suggestion=f"Rename to '{self._to_snake_case(col_name)}'"
|
||||
))
|
||||
|
||||
def _check_primary_key(self, table: Table):
|
||||
"""Check for missing primary key."""
|
||||
if not table.primary_key:
|
||||
self.issues.append(Issue(
|
||||
severity='error',
|
||||
category='constraint',
|
||||
table=table.name,
|
||||
message=f"Table '{table.name}' has no primary key",
|
||||
suggestion="Add a primary key column (e.g., 'id SERIAL PRIMARY KEY')"
|
||||
))
|
||||
|
||||
def _check_foreign_key_indexes(self, table: Table):
|
||||
"""Check that foreign key columns have indexes."""
|
||||
indexed_columns = set()
|
||||
for index in table.indexes:
|
||||
indexed_columns.update(index.columns)
|
||||
|
||||
# Primary key columns are implicitly indexed
|
||||
indexed_columns.update(table.primary_key)
|
||||
|
||||
for fk in table.foreign_keys:
|
||||
fk_col = fk['column']
|
||||
if fk_col not in indexed_columns:
|
||||
self.issues.append(Issue(
|
||||
severity='warning',
|
||||
category='index',
|
||||
table=table.name,
|
||||
message=f"Foreign key column '{fk_col}' is not indexed",
|
||||
suggestion=f"CREATE INDEX idx_{table.name}_{fk_col} ON {table.name}({fk_col});"
|
||||
))
|
||||
|
||||
# Also check columns that look like foreign keys but aren't declared
|
||||
for col_name in table.columns:
|
||||
if any(col_name.endswith(pattern) for pattern in self.FK_COLUMN_PATTERNS):
|
||||
if col_name not in indexed_columns:
|
||||
# Check if it's actually a declared FK
|
||||
is_declared_fk = any(fk['column'] == col_name for fk in table.foreign_keys)
|
||||
if not is_declared_fk:
|
||||
self.issues.append(Issue(
|
||||
severity='info',
|
||||
category='index',
|
||||
table=table.name,
|
||||
message=f"Column '{col_name}' looks like a foreign key but has no index",
|
||||
suggestion=f"CREATE INDEX idx_{table.name}_{col_name} ON {table.name}({col_name});"
|
||||
))
|
||||
|
||||
def _check_common_filter_columns(self, table: Table):
|
||||
"""Check for indexes on commonly filtered columns."""
|
||||
indexed_columns = set()
|
||||
for index in table.indexes:
|
||||
indexed_columns.update(index.columns)
|
||||
indexed_columns.update(table.primary_key)
|
||||
|
||||
for col_name in table.columns:
|
||||
col_lower = col_name.lower()
|
||||
if any(pattern in col_lower for pattern in self.FILTER_COLUMN_PATTERNS):
|
||||
if col_name not in indexed_columns:
|
||||
self.issues.append(Issue(
|
||||
severity='info',
|
||||
category='index',
|
||||
table=table.name,
|
||||
message=f"Column '{col_name}' is commonly used for filtering but has no index",
|
||||
suggestion=f"CREATE INDEX idx_{table.name}_{col_name} ON {table.name}({col_name});"
|
||||
))
|
||||
|
||||
def _check_timestamp_columns(self, table: Table):
|
||||
"""Check for indexes on timestamp columns used for sorting."""
|
||||
has_created_at = 'created_at' in table.columns
|
||||
has_updated_at = 'updated_at' in table.columns
|
||||
|
||||
if not has_created_at:
|
||||
self.issues.append(Issue(
|
||||
severity='info',
|
||||
category='convention',
|
||||
table=table.name,
|
||||
message=f"Table '{table.name}' has no 'created_at' column",
|
||||
suggestion="Consider adding: created_at TIMESTAMP DEFAULT NOW()"
|
||||
))
|
||||
|
||||
if not has_updated_at:
|
||||
self.issues.append(Issue(
|
||||
severity='info',
|
||||
category='convention',
|
||||
table=table.name,
|
||||
message=f"Table '{table.name}' has no 'updated_at' column",
|
||||
suggestion="Consider adding: updated_at TIMESTAMP DEFAULT NOW()"
|
||||
))
|
||||
|
||||
def _check_data_types(self, table: Table):
|
||||
"""Check for potential data type issues."""
|
||||
for col_name, col in table.columns.items():
|
||||
dtype = col.data_type.upper()
|
||||
|
||||
# Check for VARCHAR without length
|
||||
if 'VARCHAR' in dtype and '(' not in dtype:
|
||||
self.issues.append(Issue(
|
||||
severity='warning',
|
||||
category='type',
|
||||
table=table.name,
|
||||
message=f"Column '{col_name}' uses VARCHAR without length",
|
||||
suggestion="Specify a maximum length, e.g., VARCHAR(255)"
|
||||
))
|
||||
|
||||
# Check for FLOAT/DOUBLE for monetary values
|
||||
if 'FLOAT' in dtype or 'DOUBLE' in dtype:
|
||||
if 'price' in col_name.lower() or 'amount' in col_name.lower() or 'total' in col_name.lower():
|
||||
self.issues.append(Issue(
|
||||
severity='warning',
|
||||
category='type',
|
||||
table=table.name,
|
||||
message=f"Column '{col_name}' uses floating point for monetary value",
|
||||
suggestion="Use DECIMAL or NUMERIC for monetary values"
|
||||
))
|
||||
|
||||
# Check for TEXT columns that might benefit from length limits
|
||||
if dtype == 'TEXT':
|
||||
if 'email' in col_name.lower() or 'url' in col_name.lower():
|
||||
self.issues.append(Issue(
|
||||
severity='info',
|
||||
category='type',
|
||||
table=table.name,
|
||||
message=f"Column '{col_name}' uses TEXT but might benefit from VARCHAR",
|
||||
suggestion=f"Consider VARCHAR(255) for {col_name}"
|
||||
))
|
||||
|
||||
def _to_snake_case(self, name: str) -> str:
|
||||
"""Convert name to snake_case."""
|
||||
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
|
||||
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
|
||||
|
||||
|
||||
class MigrationGenerator:
|
||||
"""Generate migration scripts from schema differences."""
|
||||
|
||||
def __init__(self, old_tables: Dict[str, Table], new_tables: Dict[str, Table]):
|
||||
self.old_tables = old_tables
|
||||
self.new_tables = new_tables
|
||||
|
||||
def generate(self) -> Tuple[str, str]:
|
||||
"""Generate UP and DOWN migration scripts."""
|
||||
up_statements = []
|
||||
down_statements = []
|
||||
|
||||
# Find new tables
|
||||
for table_name, table in self.new_tables.items():
|
||||
if table_name not in self.old_tables:
|
||||
up_statements.append(self._generate_create_table(table))
|
||||
down_statements.append(f"DROP TABLE IF EXISTS {table_name};")
|
||||
|
||||
# Find removed tables
|
||||
for table_name, table in self.old_tables.items():
|
||||
if table_name not in self.new_tables:
|
||||
up_statements.append(f"DROP TABLE IF EXISTS {table_name};")
|
||||
down_statements.append(self._generate_create_table(table))
|
||||
|
||||
# Find modified tables
|
||||
for table_name in set(self.old_tables.keys()) & set(self.new_tables.keys()):
|
||||
old_table = self.old_tables[table_name]
|
||||
new_table = self.new_tables[table_name]
|
||||
up, down = self._compare_tables(old_table, new_table)
|
||||
up_statements.extend(up)
|
||||
down_statements.extend(down)
|
||||
|
||||
up_sql = '\n\n'.join(up_statements) if up_statements else '-- No changes'
|
||||
down_sql = '\n\n'.join(down_statements) if down_statements else '-- No changes'
|
||||
|
||||
return up_sql, down_sql
|
||||
|
||||
def _generate_create_table(self, table: Table) -> str:
|
||||
"""Generate CREATE TABLE statement."""
|
||||
lines = [f"CREATE TABLE {table.name} ("]
|
||||
|
||||
col_defs = []
|
||||
for col_name, col in table.columns.items():
|
||||
col_def = f" {col_name} {col.data_type}"
|
||||
if not col.nullable:
|
||||
col_def += " NOT NULL"
|
||||
if col.default:
|
||||
col_def += f" DEFAULT {col.default}"
|
||||
if col.primary_key and len(table.primary_key) == 1:
|
||||
col_def += " PRIMARY KEY"
|
||||
if col.unique:
|
||||
col_def += " UNIQUE"
|
||||
col_defs.append(col_def)
|
||||
|
||||
# Add composite primary key
|
||||
if len(table.primary_key) > 1:
|
||||
pk_cols = ', '.join(table.primary_key)
|
||||
col_defs.append(f" PRIMARY KEY ({pk_cols})")
|
||||
|
||||
# Add foreign keys
|
||||
for fk in table.foreign_keys:
|
||||
col_defs.append(
|
||||
f" FOREIGN KEY ({fk['column']}) REFERENCES {fk['ref_table']}({fk['ref_column']})"
|
||||
)
|
||||
|
||||
lines.append(',\n'.join(col_defs))
|
||||
lines.append(");")
|
||||
|
||||
return '\n'.join(lines)
|
||||
|
||||
def _compare_tables(self, old: Table, new: Table) -> Tuple[List[str], List[str]]:
|
||||
"""Compare two tables and generate ALTER statements."""
|
||||
up = []
|
||||
down = []
|
||||
|
||||
# New columns
|
||||
for col_name, col in new.columns.items():
|
||||
if col_name not in old.columns:
|
||||
up.append(f"ALTER TABLE {new.name} ADD COLUMN {col_name} {col.data_type}"
|
||||
+ (" NOT NULL" if not col.nullable else "")
|
||||
+ (f" DEFAULT {col.default}" if col.default else "") + ";")
|
||||
down.append(f"ALTER TABLE {new.name} DROP COLUMN IF EXISTS {col_name};")
|
||||
|
||||
# Removed columns
|
||||
for col_name, col in old.columns.items():
|
||||
if col_name not in new.columns:
|
||||
up.append(f"ALTER TABLE {old.name} DROP COLUMN IF EXISTS {col_name};")
|
||||
down.append(f"ALTER TABLE {old.name} ADD COLUMN {col_name} {col.data_type}"
|
||||
+ (" NOT NULL" if not col.nullable else "")
|
||||
+ (f" DEFAULT {col.default}" if col.default else "") + ";")
|
||||
|
||||
# Modified columns (type changes)
|
||||
for col_name in set(old.columns.keys()) & set(new.columns.keys()):
|
||||
old_col = old.columns[col_name]
|
||||
new_col = new.columns[col_name]
|
||||
|
||||
if old_col.data_type != new_col.data_type:
|
||||
up.append(f"ALTER TABLE {new.name} ALTER COLUMN {col_name} TYPE {new_col.data_type};")
|
||||
down.append(f"ALTER TABLE {old.name} ALTER COLUMN {col_name} TYPE {old_col.data_type};")
|
||||
|
||||
# New indexes
|
||||
old_index_names = {idx.name for idx in old.indexes}
|
||||
for idx in new.indexes:
|
||||
if idx.name not in old_index_names:
|
||||
unique = "UNIQUE " if idx.unique else ""
|
||||
cols = ', '.join(idx.columns)
|
||||
where = f" WHERE {idx.partial}" if idx.partial else ""
|
||||
up.append(f"CREATE {unique}INDEX CONCURRENTLY {idx.name} ON {idx.table}({cols}){where};")
|
||||
down.append(f"DROP INDEX IF EXISTS {idx.name};")
|
||||
|
||||
# Removed indexes
|
||||
new_index_names = {idx.name for idx in new.indexes}
|
||||
for idx in old.indexes:
|
||||
if idx.name not in new_index_names:
|
||||
unique = "UNIQUE " if idx.unique else ""
|
||||
cols = ', '.join(idx.columns)
|
||||
where = f" WHERE {idx.partial}" if idx.partial else ""
|
||||
up.append(f"DROP INDEX IF EXISTS {idx.name};")
|
||||
down.append(f"CREATE {unique}INDEX {idx.name} ON {idx.table}({cols}){where};")
|
||||
|
||||
return up, down
|
||||
|
||||
|
||||
class DatabaseMigrationTool:
|
||||
"""Main class for database migration tool functionality"""
|
||||
|
||||
def __init__(self, target_path: str, verbose: bool = False):
|
||||
self.target_path = Path(target_path)
|
||||
"""Main tool for database migration analysis."""
|
||||
|
||||
def __init__(self, schema_path: str, compare_path: Optional[str] = None,
|
||||
output_dir: Optional[str] = None, verbose: bool = False):
|
||||
self.schema_path = Path(schema_path)
|
||||
self.compare_path = Path(compare_path) if compare_path else None
|
||||
self.output_dir = Path(output_dir) if output_dir else None
|
||||
self.verbose = verbose
|
||||
self.results = {}
|
||||
|
||||
def run(self) -> Dict:
|
||||
"""Execute the main functionality"""
|
||||
print(f"🚀 Running {self.__class__.__name__}...")
|
||||
print(f"📁 Target: {self.target_path}")
|
||||
|
||||
try:
|
||||
self.validate_target()
|
||||
self.analyze()
|
||||
self.generate_report()
|
||||
|
||||
print("✅ Completed successfully!")
|
||||
return self.results
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
def validate_target(self):
|
||||
"""Validate the target path exists and is accessible"""
|
||||
if not self.target_path.exists():
|
||||
raise ValueError(f"Target path does not exist: {self.target_path}")
|
||||
|
||||
self.parser = SQLParser()
|
||||
|
||||
def run(self, mode: str = 'analyze') -> Dict:
|
||||
"""Execute the tool in specified mode."""
|
||||
print(f"Database Migration Tool")
|
||||
print(f"Schema: {self.schema_path}")
|
||||
print("-" * 50)
|
||||
|
||||
if not self.schema_path.exists():
|
||||
raise FileNotFoundError(f"Schema file not found: {self.schema_path}")
|
||||
|
||||
schema_sql = self.schema_path.read_text()
|
||||
tables = self.parser.parse(schema_sql)
|
||||
|
||||
if self.verbose:
|
||||
print(f"✓ Target validated: {self.target_path}")
|
||||
|
||||
def analyze(self):
|
||||
"""Perform the main analysis or operation"""
|
||||
if self.verbose:
|
||||
print("📊 Analyzing...")
|
||||
|
||||
# Main logic here
|
||||
self.results['status'] = 'success'
|
||||
self.results['target'] = str(self.target_path)
|
||||
self.results['findings'] = []
|
||||
|
||||
# Add analysis results
|
||||
if self.verbose:
|
||||
print(f"✓ Analysis complete: {len(self.results.get('findings', []))} findings")
|
||||
|
||||
def generate_report(self):
|
||||
"""Generate and display the report"""
|
||||
print("\n" + "="*50)
|
||||
print("REPORT")
|
||||
print("="*50)
|
||||
print(f"Target: {self.results.get('target')}")
|
||||
print(f"Status: {self.results.get('status')}")
|
||||
print(f"Findings: {len(self.results.get('findings', []))}")
|
||||
print("="*50 + "\n")
|
||||
print(f"Parsed {len(tables)} tables")
|
||||
|
||||
if mode == 'analyze':
|
||||
return self._analyze(tables)
|
||||
elif mode == 'compare':
|
||||
return self._compare(tables)
|
||||
elif mode == 'suggest-indexes':
|
||||
return self._suggest_indexes(tables)
|
||||
else:
|
||||
raise ValueError(f"Unknown mode: {mode}")
|
||||
|
||||
def _analyze(self, tables: Dict[str, Table]) -> Dict:
|
||||
"""Analyze schema for issues."""
|
||||
analyzer = SchemaAnalyzer(tables)
|
||||
issues = analyzer.analyze()
|
||||
|
||||
# Group by severity
|
||||
errors = [i for i in issues if i.severity == 'error']
|
||||
warnings = [i for i in issues if i.severity == 'warning']
|
||||
infos = [i for i in issues if i.severity == 'info']
|
||||
|
||||
print(f"\nAnalysis Results:")
|
||||
print(f" Tables: {len(tables)}")
|
||||
print(f" Errors: {len(errors)}")
|
||||
print(f" Warnings: {len(warnings)}")
|
||||
print(f" Suggestions: {len(infos)}")
|
||||
|
||||
if errors:
|
||||
print(f"\nERRORS:")
|
||||
for issue in errors:
|
||||
print(f" [{issue.table}] {issue.message}")
|
||||
if issue.suggestion:
|
||||
print(f" Suggestion: {issue.suggestion}")
|
||||
|
||||
if warnings:
|
||||
print(f"\nWARNINGS:")
|
||||
for issue in warnings:
|
||||
print(f" [{issue.table}] {issue.message}")
|
||||
if issue.suggestion:
|
||||
print(f" Suggestion: {issue.suggestion}")
|
||||
|
||||
if self.verbose and infos:
|
||||
print(f"\nSUGGESTIONS:")
|
||||
for issue in infos:
|
||||
print(f" [{issue.table}] {issue.message}")
|
||||
if issue.suggestion:
|
||||
print(f" {issue.suggestion}")
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'tables_count': len(tables),
|
||||
'issues': {
|
||||
'errors': len(errors),
|
||||
'warnings': len(warnings),
|
||||
'suggestions': len(infos),
|
||||
},
|
||||
'issues_detail': [asdict(i) for i in issues],
|
||||
}
|
||||
|
||||
def _compare(self, old_tables: Dict[str, Table]) -> Dict:
|
||||
"""Compare two schemas and generate migration."""
|
||||
if not self.compare_path:
|
||||
raise ValueError("Compare path required for compare mode")
|
||||
|
||||
if not self.compare_path.exists():
|
||||
raise FileNotFoundError(f"Compare file not found: {self.compare_path}")
|
||||
|
||||
new_sql = self.compare_path.read_text()
|
||||
new_tables = self.parser.parse(new_sql)
|
||||
|
||||
generator = MigrationGenerator(old_tables, new_tables)
|
||||
up_sql, down_sql = generator.generate()
|
||||
|
||||
print(f"\nComparing schemas:")
|
||||
print(f" Old: {self.schema_path}")
|
||||
print(f" New: {self.compare_path}")
|
||||
|
||||
# Calculate changes
|
||||
added_tables = set(new_tables.keys()) - set(old_tables.keys())
|
||||
removed_tables = set(old_tables.keys()) - set(new_tables.keys())
|
||||
|
||||
print(f"\nChanges detected:")
|
||||
print(f" Added tables: {len(added_tables)}")
|
||||
print(f" Removed tables: {len(removed_tables)}")
|
||||
|
||||
if self.output_dir:
|
||||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
|
||||
up_file = self.output_dir / f"{timestamp}_migration.sql"
|
||||
down_file = self.output_dir / f"{timestamp}_migration_rollback.sql"
|
||||
|
||||
up_file.write_text(f"-- Migration: {self.schema_path} -> {self.compare_path}\n"
|
||||
f"-- Generated: {datetime.now().isoformat()}\n\n"
|
||||
f"BEGIN;\n\n{up_sql}\n\nCOMMIT;\n")
|
||||
|
||||
down_file.write_text(f"-- Rollback for migration {timestamp}\n"
|
||||
f"-- Generated: {datetime.now().isoformat()}\n\n"
|
||||
f"BEGIN;\n\n{down_sql}\n\nCOMMIT;\n")
|
||||
|
||||
print(f"\nGenerated files:")
|
||||
print(f" Migration: {up_file}")
|
||||
print(f" Rollback: {down_file}")
|
||||
else:
|
||||
print(f"\n--- UP MIGRATION ---")
|
||||
print(up_sql)
|
||||
print(f"\n--- DOWN MIGRATION ---")
|
||||
print(down_sql)
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'added_tables': list(added_tables),
|
||||
'removed_tables': list(removed_tables),
|
||||
'up_sql': up_sql,
|
||||
'down_sql': down_sql,
|
||||
}
|
||||
|
||||
def _suggest_indexes(self, tables: Dict[str, Table]) -> Dict:
|
||||
"""Generate index suggestions."""
|
||||
suggestions = []
|
||||
|
||||
for table_name, table in tables.items():
|
||||
# Get existing indexed columns
|
||||
indexed = set()
|
||||
for idx in table.indexes:
|
||||
indexed.update(idx.columns)
|
||||
indexed.update(table.primary_key)
|
||||
|
||||
# Suggest indexes for foreign keys
|
||||
for fk in table.foreign_keys:
|
||||
if fk['column'] not in indexed:
|
||||
suggestions.append({
|
||||
'table': table_name,
|
||||
'column': fk['column'],
|
||||
'reason': 'Foreign key',
|
||||
'sql': f"CREATE INDEX idx_{table_name}_{fk['column']} ON {table_name}({fk['column']});"
|
||||
})
|
||||
|
||||
# Suggest indexes for common patterns
|
||||
for col_name in table.columns:
|
||||
if col_name in indexed:
|
||||
continue
|
||||
|
||||
col_lower = col_name.lower()
|
||||
|
||||
# Foreign key pattern
|
||||
if col_name.endswith('_id') and col_name not in indexed:
|
||||
suggestions.append({
|
||||
'table': table_name,
|
||||
'column': col_name,
|
||||
'reason': 'Likely foreign key',
|
||||
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name});"
|
||||
})
|
||||
|
||||
# Status/type columns
|
||||
elif col_lower in ['status', 'state', 'type', 'category']:
|
||||
suggestions.append({
|
||||
'table': table_name,
|
||||
'column': col_name,
|
||||
'reason': 'Common filter column',
|
||||
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name});"
|
||||
})
|
||||
|
||||
# Timestamp columns
|
||||
elif col_lower in ['created_at', 'updated_at']:
|
||||
suggestions.append({
|
||||
'table': table_name,
|
||||
'column': col_name,
|
||||
'reason': 'Common sort column',
|
||||
'sql': f"CREATE INDEX idx_{table_name}_{col_name} ON {table_name}({col_name} DESC);"
|
||||
})
|
||||
|
||||
print(f"\nIndex Suggestions ({len(suggestions)} found):")
|
||||
for s in suggestions:
|
||||
print(f"\n [{s['table']}.{s['column']}] {s['reason']}")
|
||||
print(f" {s['sql']}")
|
||||
|
||||
if self.output_dir:
|
||||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||||
output_file = self.output_dir / f"{timestamp}_add_indexes.sql"
|
||||
|
||||
lines = [
|
||||
f"-- Suggested indexes",
|
||||
f"-- Generated: {datetime.now().isoformat()}",
|
||||
"",
|
||||
]
|
||||
for s in suggestions:
|
||||
lines.append(f"-- {s['table']}.{s['column']}: {s['reason']}")
|
||||
lines.append(s['sql'])
|
||||
lines.append("")
|
||||
|
||||
output_file.write_text('\n'.join(lines))
|
||||
print(f"\nWritten to: {output_file}")
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'suggestions_count': len(suggestions),
|
||||
'suggestions': suggestions,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
"""CLI entry point."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Database Migration Tool"
|
||||
description='Analyze SQL schemas and generate migrations',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog='''
|
||||
Examples:
|
||||
%(prog)s schema.sql --analyze
|
||||
%(prog)s old.sql --compare new.sql --output migrations/
|
||||
%(prog)s schema.sql --suggest-indexes --output migrations/
|
||||
'''
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'schema',
|
||||
help='Path to SQL schema file'
|
||||
)
|
||||
parser.add_argument(
|
||||
'target',
|
||||
help='Target path to analyze or process'
|
||||
'--analyze',
|
||||
action='store_true',
|
||||
help='Analyze schema for issues and optimizations'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--compare',
|
||||
metavar='FILE',
|
||||
help='Compare with another schema file and generate migration'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--suggest-indexes',
|
||||
action='store_true',
|
||||
help='Generate index suggestions'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--output', '-o',
|
||||
help='Output directory for generated files'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--verbose', '-v',
|
||||
@@ -87,28 +825,34 @@ def main():
|
||||
action='store_true',
|
||||
help='Output results as JSON'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--output', '-o',
|
||||
help='Output file path'
|
||||
)
|
||||
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
tool = DatabaseMigrationTool(
|
||||
args.target,
|
||||
verbose=args.verbose
|
||||
)
|
||||
|
||||
results = tool.run()
|
||||
|
||||
if args.json:
|
||||
output = json.dumps(results, indent=2)
|
||||
if args.output:
|
||||
with open(args.output, 'w') as f:
|
||||
f.write(output)
|
||||
print(f"Results written to {args.output}")
|
||||
else:
|
||||
print(output)
|
||||
|
||||
# Determine mode
|
||||
if args.compare:
|
||||
mode = 'compare'
|
||||
elif args.suggest_indexes:
|
||||
mode = 'suggest-indexes'
|
||||
else:
|
||||
mode = 'analyze'
|
||||
|
||||
try:
|
||||
tool = DatabaseMigrationTool(
|
||||
schema_path=args.schema,
|
||||
compare_path=args.compare,
|
||||
output_dir=args.output,
|
||||
verbose=args.verbose,
|
||||
)
|
||||
|
||||
results = tool.run(mode=mode)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(results, indent=2))
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user