Complete overhaul of senior-data-engineer skill (previously Grade F: 43/100): SKILL.md (~550 lines): - Added table of contents and trigger phrases - 3 actionable workflows: Batch ETL Pipeline, Real-Time Streaming, Data Quality Framework - Architecture decision framework (Batch vs Stream, Lambda vs Kappa) - Tech stack overview with decision matrix - Troubleshooting section with common issues and solutions Reference Files (all rewritten from 81-line boilerplate): - data_pipeline_architecture.md (~700 lines): Lambda/Kappa architectures, batch processing with Spark, stream processing with Kafka/Flink, exactly-once semantics, error handling strategies, orchestration patterns - data_modeling_patterns.md (~650 lines): Dimensional modeling (Star/Snowflake/OBT), SCD Types 0-6 with SQL implementations, Data Vault (Hub/Satellite/Link), dbt best practices, partitioning and clustering strategies - dataops_best_practices.md (~750 lines): Data testing (Great Expectations, dbt), data contracts with YAML definitions, CI/CD pipelines, observability with OpenLineage, incident response runbooks, cost optimization Python Scripts (all rewritten from 101-line placeholders): - pipeline_orchestrator.py (~600 lines): Generates Airflow DAGs, Prefect flows, and Dagster jobs with configurable ETL patterns - data_quality_validator.py (~1640 lines): Schema validation, data profiling, Great Expectations suite generation, data contract validation, anomaly detection - etl_performance_optimizer.py (~1680 lines): SQL query analysis, Spark job optimization, partition strategy recommendations, cost estimation for BigQuery/Snowflake/Redshift/Databricks Resolves #53 Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1637 lines
59 KiB
Python
Executable File
1637 lines
59 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Data Quality Validator
|
|
Comprehensive data quality validation tool for data engineering workflows.
|
|
|
|
Features:
|
|
- Schema validation (types, nullability, constraints)
|
|
- Data profiling (statistics, distributions, patterns)
|
|
- Great Expectations suite generation
|
|
- Data contract validation
|
|
- Anomaly detection
|
|
- Quality scoring and reporting
|
|
|
|
Usage:
|
|
python data_quality_validator.py validate data.csv --schema schema.json
|
|
python data_quality_validator.py profile data.csv --output profile.json
|
|
python data_quality_validator.py generate-suite data.csv --output expectations.json
|
|
python data_quality_validator.py contract data.csv --contract contract.yaml
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import csv
|
|
import re
|
|
import argparse
|
|
import logging
|
|
import statistics
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any, Tuple, Set
|
|
from dataclasses import dataclass, field, asdict
|
|
from datetime import datetime
|
|
from collections import Counter
|
|
from abc import ABC, abstractmethod
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Data Classes
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class ColumnSchema:
|
|
"""Schema definition for a column"""
|
|
name: str
|
|
data_type: str # string, integer, float, boolean, date, datetime, email, uuid
|
|
nullable: bool = True
|
|
unique: bool = False
|
|
min_value: Optional[float] = None
|
|
max_value: Optional[float] = None
|
|
min_length: Optional[int] = None
|
|
max_length: Optional[int] = None
|
|
pattern: Optional[str] = None # regex pattern
|
|
allowed_values: Optional[List[str]] = None
|
|
description: str = ""
|
|
|
|
|
|
@dataclass
|
|
class DataSchema:
|
|
"""Complete schema for a dataset"""
|
|
name: str
|
|
version: str
|
|
columns: List[ColumnSchema]
|
|
primary_key: Optional[List[str]] = None
|
|
row_count_min: Optional[int] = None
|
|
row_count_max: Optional[int] = None
|
|
|
|
|
|
@dataclass
|
|
class ValidationResult:
|
|
"""Result of a single validation check"""
|
|
check_name: str
|
|
column: Optional[str]
|
|
passed: bool
|
|
expected: Any
|
|
actual: Any
|
|
severity: str = "error" # error, warning, info
|
|
message: str = ""
|
|
failed_rows: List[int] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class ColumnProfile:
|
|
"""Statistical profile of a column"""
|
|
name: str
|
|
data_type: str
|
|
total_count: int
|
|
null_count: int
|
|
null_percentage: float
|
|
unique_count: int
|
|
unique_percentage: float
|
|
# Numeric stats
|
|
min_value: Optional[float] = None
|
|
max_value: Optional[float] = None
|
|
mean: Optional[float] = None
|
|
median: Optional[float] = None
|
|
std_dev: Optional[float] = None
|
|
percentile_25: Optional[float] = None
|
|
percentile_75: Optional[float] = None
|
|
# String stats
|
|
min_length: Optional[int] = None
|
|
max_length: Optional[int] = None
|
|
avg_length: Optional[float] = None
|
|
# Pattern detection
|
|
detected_pattern: Optional[str] = None
|
|
top_values: List[Tuple[str, int]] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class DataProfile:
|
|
"""Complete profile of a dataset"""
|
|
name: str
|
|
row_count: int
|
|
column_count: int
|
|
columns: List[ColumnProfile]
|
|
duplicate_rows: int
|
|
memory_size_bytes: int
|
|
profile_timestamp: str
|
|
|
|
|
|
@dataclass
|
|
class QualityScore:
|
|
"""Overall quality score for a dataset"""
|
|
completeness: float # % of non-null values
|
|
uniqueness: float # % of unique values where expected
|
|
validity: float # % passing validation rules
|
|
consistency: float # % passing cross-column checks
|
|
accuracy: float # % matching expected patterns
|
|
overall: float # weighted average
|
|
|
|
|
|
# =============================================================================
|
|
# Type Detection
|
|
# =============================================================================
|
|
|
|
class TypeDetector:
|
|
"""Detect and infer data types from values"""
|
|
|
|
PATTERNS = {
|
|
'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
|
|
'uuid': r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
|
|
'phone': r'^\+?[\d\s\-\(\)]{10,}$',
|
|
'url': r'^https?://[^\s]+$',
|
|
'ipv4': r'^(\d{1,3}\.){3}\d{1,3}$',
|
|
'date_iso': r'^\d{4}-\d{2}-\d{2}$',
|
|
'datetime_iso': r'^\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}',
|
|
'credit_card': r'^\d{4}[\s\-]?\d{4}[\s\-]?\d{4}[\s\-]?\d{4}$',
|
|
}
|
|
|
|
@classmethod
|
|
def detect_type(cls, values: List[str]) -> str:
|
|
"""Detect the most likely data type from a sample of values"""
|
|
non_empty = [v for v in values if v and v.strip()]
|
|
if not non_empty:
|
|
return "string"
|
|
|
|
# Check for patterns first
|
|
for pattern_name, pattern in cls.PATTERNS.items():
|
|
regex = re.compile(pattern, re.IGNORECASE)
|
|
matches = sum(1 for v in non_empty if regex.match(v.strip()))
|
|
if matches / len(non_empty) > 0.9:
|
|
return pattern_name
|
|
|
|
# Check for numeric types
|
|
int_count = 0
|
|
float_count = 0
|
|
bool_count = 0
|
|
|
|
for v in non_empty:
|
|
v = v.strip()
|
|
if v.lower() in ('true', 'false', 'yes', 'no', '1', '0'):
|
|
bool_count += 1
|
|
try:
|
|
int(v)
|
|
int_count += 1
|
|
except ValueError:
|
|
try:
|
|
float(v)
|
|
float_count += 1
|
|
except ValueError:
|
|
pass
|
|
|
|
if bool_count / len(non_empty) > 0.9:
|
|
return "boolean"
|
|
if int_count / len(non_empty) > 0.9:
|
|
return "integer"
|
|
if (int_count + float_count) / len(non_empty) > 0.9:
|
|
return "float"
|
|
|
|
return "string"
|
|
|
|
@classmethod
|
|
def detect_pattern(cls, values: List[str]) -> Optional[str]:
|
|
"""Try to detect a common pattern in string values"""
|
|
non_empty = [v for v in values if v and v.strip()]
|
|
if not non_empty or len(non_empty) < 10:
|
|
return None
|
|
|
|
for pattern_name, pattern in cls.PATTERNS.items():
|
|
regex = re.compile(pattern, re.IGNORECASE)
|
|
matches = sum(1 for v in non_empty if regex.match(v.strip()))
|
|
if matches / len(non_empty) > 0.8:
|
|
return pattern_name
|
|
|
|
return None
|
|
|
|
|
|
# =============================================================================
|
|
# Validators
|
|
# =============================================================================
|
|
|
|
class BaseValidator(ABC):
|
|
"""Base class for validators"""
|
|
|
|
@abstractmethod
|
|
def validate(self, data: List[Dict], schema: Optional[DataSchema] = None) -> List[ValidationResult]:
|
|
pass
|
|
|
|
|
|
class SchemaValidator(BaseValidator):
|
|
"""Validate data against a schema"""
|
|
|
|
def validate(self, data: List[Dict], schema: DataSchema) -> List[ValidationResult]:
|
|
results = []
|
|
|
|
if not data:
|
|
results.append(ValidationResult(
|
|
check_name="data_not_empty",
|
|
column=None,
|
|
passed=False,
|
|
expected="non-empty dataset",
|
|
actual="empty dataset",
|
|
severity="error",
|
|
message="Dataset is empty"
|
|
))
|
|
return results
|
|
|
|
# Validate row count
|
|
row_count = len(data)
|
|
if schema.row_count_min and row_count < schema.row_count_min:
|
|
results.append(ValidationResult(
|
|
check_name="row_count_min",
|
|
column=None,
|
|
passed=False,
|
|
expected=f">= {schema.row_count_min}",
|
|
actual=row_count,
|
|
severity="error",
|
|
message=f"Row count {row_count} is below minimum {schema.row_count_min}"
|
|
))
|
|
|
|
if schema.row_count_max and row_count > schema.row_count_max:
|
|
results.append(ValidationResult(
|
|
check_name="row_count_max",
|
|
column=None,
|
|
passed=False,
|
|
expected=f"<= {schema.row_count_max}",
|
|
actual=row_count,
|
|
severity="warning",
|
|
message=f"Row count {row_count} exceeds maximum {schema.row_count_max}"
|
|
))
|
|
|
|
# Validate each column
|
|
for col_schema in schema.columns:
|
|
col_results = self._validate_column(data, col_schema)
|
|
results.extend(col_results)
|
|
|
|
# Validate primary key uniqueness
|
|
if schema.primary_key:
|
|
pk_results = self._validate_primary_key(data, schema.primary_key)
|
|
results.extend(pk_results)
|
|
|
|
return results
|
|
|
|
def _validate_column(self, data: List[Dict], col_schema: ColumnSchema) -> List[ValidationResult]:
|
|
results = []
|
|
col_name = col_schema.name
|
|
|
|
# Check column exists
|
|
if data and col_name not in data[0]:
|
|
results.append(ValidationResult(
|
|
check_name="column_exists",
|
|
column=col_name,
|
|
passed=False,
|
|
expected="column present",
|
|
actual="column missing",
|
|
severity="error",
|
|
message=f"Column '{col_name}' not found in data"
|
|
))
|
|
return results
|
|
|
|
values = [row.get(col_name) for row in data]
|
|
failed_rows = []
|
|
|
|
# Null check
|
|
null_count = sum(1 for v in values if v is None or v == '')
|
|
if not col_schema.nullable and null_count > 0:
|
|
failed_rows = [i for i, v in enumerate(values) if v is None or v == '']
|
|
results.append(ValidationResult(
|
|
check_name="not_null",
|
|
column=col_name,
|
|
passed=False,
|
|
expected="no nulls",
|
|
actual=f"{null_count} nulls",
|
|
severity="error",
|
|
message=f"Column '{col_name}' has {null_count} null values but is not nullable",
|
|
failed_rows=failed_rows[:100] # Limit to first 100
|
|
))
|
|
|
|
non_null_values = [v for v in values if v is not None and v != '']
|
|
|
|
# Uniqueness check
|
|
if col_schema.unique and non_null_values:
|
|
unique_count = len(set(non_null_values))
|
|
if unique_count != len(non_null_values):
|
|
duplicate_values = [v for v, count in Counter(non_null_values).items() if count > 1]
|
|
results.append(ValidationResult(
|
|
check_name="unique",
|
|
column=col_name,
|
|
passed=False,
|
|
expected="all unique",
|
|
actual=f"{len(non_null_values) - unique_count} duplicates",
|
|
severity="error",
|
|
message=f"Column '{col_name}' has duplicate values: {duplicate_values[:5]}"
|
|
))
|
|
|
|
# Type validation
|
|
type_failures = self._validate_type(non_null_values, col_schema.data_type)
|
|
if type_failures:
|
|
results.append(ValidationResult(
|
|
check_name="data_type",
|
|
column=col_name,
|
|
passed=False,
|
|
expected=col_schema.data_type,
|
|
actual=f"{len(type_failures)} invalid values",
|
|
severity="error",
|
|
message=f"Column '{col_name}' has {len(type_failures)} values not matching type {col_schema.data_type}",
|
|
failed_rows=type_failures[:100]
|
|
))
|
|
|
|
# Range validation for numeric columns
|
|
if col_schema.min_value is not None or col_schema.max_value is not None:
|
|
range_failures = self._validate_range(non_null_values, col_schema)
|
|
if range_failures:
|
|
results.append(ValidationResult(
|
|
check_name="value_range",
|
|
column=col_name,
|
|
passed=False,
|
|
expected=f"[{col_schema.min_value}, {col_schema.max_value}]",
|
|
actual=f"{len(range_failures)} out of range",
|
|
severity="error",
|
|
message=f"Column '{col_name}' has values outside range",
|
|
failed_rows=range_failures[:100]
|
|
))
|
|
|
|
# Length validation for string columns
|
|
if col_schema.min_length is not None or col_schema.max_length is not None:
|
|
length_failures = self._validate_length(non_null_values, col_schema)
|
|
if length_failures:
|
|
results.append(ValidationResult(
|
|
check_name="string_length",
|
|
column=col_name,
|
|
passed=False,
|
|
expected=f"length [{col_schema.min_length}, {col_schema.max_length}]",
|
|
actual=f"{len(length_failures)} out of range",
|
|
severity="warning",
|
|
message=f"Column '{col_name}' has values with invalid length",
|
|
failed_rows=length_failures[:100]
|
|
))
|
|
|
|
# Pattern validation
|
|
if col_schema.pattern:
|
|
pattern_failures = self._validate_pattern(non_null_values, col_schema.pattern)
|
|
if pattern_failures:
|
|
results.append(ValidationResult(
|
|
check_name="pattern_match",
|
|
column=col_name,
|
|
passed=False,
|
|
expected=f"matches {col_schema.pattern}",
|
|
actual=f"{len(pattern_failures)} non-matching",
|
|
severity="error",
|
|
message=f"Column '{col_name}' has values not matching pattern",
|
|
failed_rows=pattern_failures[:100]
|
|
))
|
|
|
|
# Allowed values validation
|
|
if col_schema.allowed_values:
|
|
allowed_set = set(col_schema.allowed_values)
|
|
invalid = [i for i, v in enumerate(non_null_values) if str(v) not in allowed_set]
|
|
if invalid:
|
|
results.append(ValidationResult(
|
|
check_name="allowed_values",
|
|
column=col_name,
|
|
passed=False,
|
|
expected=f"one of {col_schema.allowed_values}",
|
|
actual=f"{len(invalid)} invalid values",
|
|
severity="error",
|
|
message=f"Column '{col_name}' has values not in allowed list",
|
|
failed_rows=invalid[:100]
|
|
))
|
|
|
|
return results
|
|
|
|
def _validate_type(self, values: List[Any], expected_type: str) -> List[int]:
|
|
"""Return indices of values that don't match expected type"""
|
|
failures = []
|
|
|
|
for i, v in enumerate(values):
|
|
v_str = str(v)
|
|
valid = False
|
|
|
|
if expected_type == "integer":
|
|
try:
|
|
int(v_str)
|
|
valid = True
|
|
except ValueError:
|
|
pass
|
|
elif expected_type == "float":
|
|
try:
|
|
float(v_str)
|
|
valid = True
|
|
except ValueError:
|
|
pass
|
|
elif expected_type == "boolean":
|
|
valid = v_str.lower() in ('true', 'false', 'yes', 'no', '1', '0')
|
|
elif expected_type == "email":
|
|
valid = bool(re.match(TypeDetector.PATTERNS['email'], v_str, re.IGNORECASE))
|
|
elif expected_type == "uuid":
|
|
valid = bool(re.match(TypeDetector.PATTERNS['uuid'], v_str, re.IGNORECASE))
|
|
elif expected_type in ("date", "date_iso"):
|
|
valid = bool(re.match(TypeDetector.PATTERNS['date_iso'], v_str))
|
|
elif expected_type in ("datetime", "datetime_iso"):
|
|
valid = bool(re.match(TypeDetector.PATTERNS['datetime_iso'], v_str))
|
|
else:
|
|
valid = True # string accepts anything
|
|
|
|
if not valid:
|
|
failures.append(i)
|
|
|
|
return failures
|
|
|
|
def _validate_range(self, values: List[Any], col_schema: ColumnSchema) -> List[int]:
|
|
"""Return indices of values outside the specified range"""
|
|
failures = []
|
|
for i, v in enumerate(values):
|
|
try:
|
|
num = float(v)
|
|
if col_schema.min_value is not None and num < col_schema.min_value:
|
|
failures.append(i)
|
|
elif col_schema.max_value is not None and num > col_schema.max_value:
|
|
failures.append(i)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
return failures
|
|
|
|
def _validate_length(self, values: List[Any], col_schema: ColumnSchema) -> List[int]:
|
|
"""Return indices of values with invalid string length"""
|
|
failures = []
|
|
for i, v in enumerate(values):
|
|
length = len(str(v))
|
|
if col_schema.min_length is not None and length < col_schema.min_length:
|
|
failures.append(i)
|
|
elif col_schema.max_length is not None and length > col_schema.max_length:
|
|
failures.append(i)
|
|
return failures
|
|
|
|
def _validate_pattern(self, values: List[Any], pattern: str) -> List[int]:
|
|
"""Return indices of values not matching the pattern"""
|
|
regex = re.compile(pattern)
|
|
return [i for i, v in enumerate(values) if not regex.match(str(v))]
|
|
|
|
def _validate_primary_key(self, data: List[Dict], pk_columns: List[str]) -> List[ValidationResult]:
|
|
"""Validate primary key uniqueness"""
|
|
results = []
|
|
pk_values = []
|
|
|
|
for row in data:
|
|
pk = tuple(row.get(col) for col in pk_columns)
|
|
pk_values.append(pk)
|
|
|
|
pk_counts = Counter(pk_values)
|
|
duplicates = {pk: count for pk, count in pk_counts.items() if count > 1}
|
|
|
|
if duplicates:
|
|
results.append(ValidationResult(
|
|
check_name="primary_key_unique",
|
|
column=",".join(pk_columns),
|
|
passed=False,
|
|
expected="all unique",
|
|
actual=f"{len(duplicates)} duplicate keys",
|
|
severity="error",
|
|
message=f"Primary key has {len(duplicates)} duplicate combinations"
|
|
))
|
|
|
|
return results
|
|
|
|
|
|
class AnomalyDetector(BaseValidator):
|
|
"""Detect anomalies in data"""
|
|
|
|
def __init__(self, z_threshold: float = 3.0, iqr_multiplier: float = 1.5):
|
|
self.z_threshold = z_threshold
|
|
self.iqr_multiplier = iqr_multiplier
|
|
|
|
def validate(self, data: List[Dict], schema: Optional[DataSchema] = None) -> List[ValidationResult]:
|
|
results = []
|
|
|
|
if not data:
|
|
return results
|
|
|
|
# Get numeric columns
|
|
numeric_columns = []
|
|
for col in data[0].keys():
|
|
values = [row.get(col) for row in data]
|
|
non_null = [v for v in values if v is not None and v != '']
|
|
try:
|
|
[float(v) for v in non_null[:100]]
|
|
numeric_columns.append(col)
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
for col in numeric_columns:
|
|
col_results = self._detect_numeric_anomalies(data, col)
|
|
results.extend(col_results)
|
|
|
|
return results
|
|
|
|
def _detect_numeric_anomalies(self, data: List[Dict], column: str) -> List[ValidationResult]:
|
|
results = []
|
|
|
|
values = []
|
|
for row in data:
|
|
v = row.get(column)
|
|
if v is not None and v != '':
|
|
try:
|
|
values.append(float(v))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if len(values) < 10:
|
|
return results
|
|
|
|
# Z-score method
|
|
mean = statistics.mean(values)
|
|
std = statistics.stdev(values) if len(values) > 1 else 0
|
|
|
|
if std > 0:
|
|
z_outliers = []
|
|
for i, v in enumerate(values):
|
|
z_score = abs((v - mean) / std)
|
|
if z_score > self.z_threshold:
|
|
z_outliers.append((i, v, z_score))
|
|
|
|
if z_outliers:
|
|
results.append(ValidationResult(
|
|
check_name="z_score_outlier",
|
|
column=column,
|
|
passed=len(z_outliers) == 0,
|
|
expected=f"z-score <= {self.z_threshold}",
|
|
actual=f"{len(z_outliers)} outliers",
|
|
severity="warning",
|
|
message=f"Column '{column}' has {len(z_outliers)} statistical outliers (z-score method)",
|
|
failed_rows=[o[0] for o in z_outliers[:100]]
|
|
))
|
|
|
|
# IQR method
|
|
sorted_values = sorted(values)
|
|
q1_idx = len(sorted_values) // 4
|
|
q3_idx = (3 * len(sorted_values)) // 4
|
|
q1 = sorted_values[q1_idx]
|
|
q3 = sorted_values[q3_idx]
|
|
iqr = q3 - q1
|
|
|
|
lower_bound = q1 - self.iqr_multiplier * iqr
|
|
upper_bound = q3 + self.iqr_multiplier * iqr
|
|
|
|
iqr_outliers = [(i, v) for i, v in enumerate(values) if v < lower_bound or v > upper_bound]
|
|
|
|
if iqr_outliers:
|
|
results.append(ValidationResult(
|
|
check_name="iqr_outlier",
|
|
column=column,
|
|
passed=len(iqr_outliers) == 0,
|
|
expected=f"value in [{lower_bound:.2f}, {upper_bound:.2f}]",
|
|
actual=f"{len(iqr_outliers)} outliers",
|
|
severity="warning",
|
|
message=f"Column '{column}' has {len(iqr_outliers)} outliers (IQR method)",
|
|
failed_rows=[o[0] for o in iqr_outliers[:100]]
|
|
))
|
|
|
|
return results
|
|
|
|
|
|
# =============================================================================
|
|
# Data Profiler
|
|
# =============================================================================
|
|
|
|
class DataProfiler:
|
|
"""Generate statistical profiles of datasets"""
|
|
|
|
def profile(self, data: List[Dict], name: str = "dataset") -> DataProfile:
|
|
"""Generate a complete profile of the dataset"""
|
|
if not data:
|
|
return DataProfile(
|
|
name=name,
|
|
row_count=0,
|
|
column_count=0,
|
|
columns=[],
|
|
duplicate_rows=0,
|
|
memory_size_bytes=0,
|
|
profile_timestamp=datetime.now().isoformat()
|
|
)
|
|
|
|
columns = list(data[0].keys())
|
|
column_profiles = []
|
|
|
|
for col in columns:
|
|
profile = self._profile_column(data, col)
|
|
column_profiles.append(profile)
|
|
|
|
# Count duplicates
|
|
row_tuples = [tuple(sorted(row.items())) for row in data]
|
|
duplicate_count = len(row_tuples) - len(set(row_tuples))
|
|
|
|
# Estimate memory size
|
|
memory_size = sys.getsizeof(data) + sum(
|
|
sys.getsizeof(row) + sum(sys.getsizeof(v) for v in row.values())
|
|
for row in data
|
|
)
|
|
|
|
return DataProfile(
|
|
name=name,
|
|
row_count=len(data),
|
|
column_count=len(columns),
|
|
columns=column_profiles,
|
|
duplicate_rows=duplicate_count,
|
|
memory_size_bytes=memory_size,
|
|
profile_timestamp=datetime.now().isoformat()
|
|
)
|
|
|
|
def _profile_column(self, data: List[Dict], column: str) -> ColumnProfile:
|
|
"""Generate profile for a single column"""
|
|
values = [row.get(column) for row in data]
|
|
non_null = [v for v in values if v is not None and v != '']
|
|
|
|
total_count = len(values)
|
|
null_count = total_count - len(non_null)
|
|
null_pct = (null_count / total_count * 100) if total_count > 0 else 0
|
|
|
|
unique_values = set(str(v) for v in non_null)
|
|
unique_count = len(unique_values)
|
|
unique_pct = (unique_count / len(non_null) * 100) if non_null else 0
|
|
|
|
# Detect type
|
|
sample = [str(v) for v in non_null[:1000]]
|
|
detected_type = TypeDetector.detect_type(sample)
|
|
detected_pattern = TypeDetector.detect_pattern(sample)
|
|
|
|
# Top values
|
|
value_counts = Counter(str(v) for v in non_null)
|
|
top_values = value_counts.most_common(10)
|
|
|
|
profile = ColumnProfile(
|
|
name=column,
|
|
data_type=detected_type,
|
|
total_count=total_count,
|
|
null_count=null_count,
|
|
null_percentage=null_pct,
|
|
unique_count=unique_count,
|
|
unique_percentage=unique_pct,
|
|
detected_pattern=detected_pattern,
|
|
top_values=top_values
|
|
)
|
|
|
|
# Add numeric stats if applicable
|
|
if detected_type in ('integer', 'float'):
|
|
numeric_values = []
|
|
for v in non_null:
|
|
try:
|
|
numeric_values.append(float(v))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
if numeric_values:
|
|
sorted_vals = sorted(numeric_values)
|
|
profile.min_value = min(numeric_values)
|
|
profile.max_value = max(numeric_values)
|
|
profile.mean = statistics.mean(numeric_values)
|
|
profile.median = statistics.median(numeric_values)
|
|
if len(numeric_values) > 1:
|
|
profile.std_dev = statistics.stdev(numeric_values)
|
|
profile.percentile_25 = sorted_vals[len(sorted_vals) // 4]
|
|
profile.percentile_75 = sorted_vals[(3 * len(sorted_vals)) // 4]
|
|
|
|
# Add string stats
|
|
if detected_type == 'string':
|
|
lengths = [len(str(v)) for v in non_null]
|
|
if lengths:
|
|
profile.min_length = min(lengths)
|
|
profile.max_length = max(lengths)
|
|
profile.avg_length = statistics.mean(lengths)
|
|
|
|
return profile
|
|
|
|
|
|
# =============================================================================
|
|
# Great Expectations Suite Generator
|
|
# =============================================================================
|
|
|
|
class GreatExpectationsGenerator:
|
|
"""Generate Great Expectations validation suites"""
|
|
|
|
def generate_suite(self, profile: DataProfile) -> Dict:
|
|
"""Generate a Great Expectations suite from a data profile"""
|
|
expectations = []
|
|
|
|
for col_profile in profile.columns:
|
|
col_expectations = self._generate_column_expectations(col_profile)
|
|
expectations.extend(col_expectations)
|
|
|
|
# Table-level expectations
|
|
expectations.append({
|
|
"expectation_type": "expect_table_row_count_to_be_between",
|
|
"kwargs": {
|
|
"min_value": max(1, int(profile.row_count * 0.5)),
|
|
"max_value": int(profile.row_count * 2)
|
|
}
|
|
})
|
|
|
|
expectations.append({
|
|
"expectation_type": "expect_table_column_count_to_equal",
|
|
"kwargs": {
|
|
"value": profile.column_count
|
|
}
|
|
})
|
|
|
|
suite = {
|
|
"expectation_suite_name": f"{profile.name}_suite",
|
|
"expectations": expectations,
|
|
"meta": {
|
|
"generated_at": datetime.now().isoformat(),
|
|
"generator": "data_quality_validator",
|
|
"source_profile": profile.name
|
|
}
|
|
}
|
|
|
|
return suite
|
|
|
|
def _generate_column_expectations(self, col_profile: ColumnProfile) -> List[Dict]:
|
|
"""Generate expectations for a single column"""
|
|
expectations = []
|
|
col_name = col_profile.name
|
|
|
|
# Column exists
|
|
expectations.append({
|
|
"expectation_type": "expect_column_to_exist",
|
|
"kwargs": {"column": col_name}
|
|
})
|
|
|
|
# Null percentage
|
|
if col_profile.null_percentage < 1:
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_not_be_null",
|
|
"kwargs": {"column": col_name}
|
|
})
|
|
elif col_profile.null_percentage < 50:
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_not_be_null",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"mostly": 1 - (col_profile.null_percentage / 100 * 1.5)
|
|
}
|
|
})
|
|
|
|
# Uniqueness
|
|
if col_profile.unique_percentage > 99:
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_be_unique",
|
|
"kwargs": {"column": col_name}
|
|
})
|
|
|
|
# Type-specific expectations
|
|
if col_profile.data_type == 'integer':
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_be_in_type_list",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"type_list": ["int", "int64", "INTEGER", "BIGINT"]
|
|
}
|
|
})
|
|
if col_profile.min_value is not None:
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_be_between",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"min_value": col_profile.min_value,
|
|
"max_value": col_profile.max_value
|
|
}
|
|
})
|
|
|
|
elif col_profile.data_type == 'float':
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_be_in_type_list",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"type_list": ["float", "float64", "FLOAT", "DOUBLE"]
|
|
}
|
|
})
|
|
if col_profile.min_value is not None:
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_be_between",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"min_value": col_profile.min_value,
|
|
"max_value": col_profile.max_value
|
|
}
|
|
})
|
|
|
|
elif col_profile.data_type == 'email':
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_match_regex",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"regex": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
|
|
}
|
|
})
|
|
|
|
elif col_profile.data_type in ('date_iso', 'date'):
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_match_strftime_format",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"strftime_format": "%Y-%m-%d"
|
|
}
|
|
})
|
|
|
|
# String length expectations
|
|
if col_profile.min_length is not None:
|
|
expectations.append({
|
|
"expectation_type": "expect_column_value_lengths_to_be_between",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"min_value": max(1, col_profile.min_length),
|
|
"max_value": col_profile.max_length * 2 if col_profile.max_length else None
|
|
}
|
|
})
|
|
|
|
# Categorical (low cardinality) columns
|
|
if col_profile.unique_count <= 20 and col_profile.unique_percentage < 10:
|
|
top_values = [v[0] for v in col_profile.top_values if v[1] > col_profile.total_count * 0.01]
|
|
if top_values:
|
|
expectations.append({
|
|
"expectation_type": "expect_column_values_to_be_in_set",
|
|
"kwargs": {
|
|
"column": col_name,
|
|
"value_set": top_values,
|
|
"mostly": 0.95
|
|
}
|
|
})
|
|
|
|
return expectations
|
|
|
|
|
|
# =============================================================================
|
|
# Quality Score Calculator
|
|
# =============================================================================
|
|
|
|
class QualityScoreCalculator:
|
|
"""Calculate overall data quality scores"""
|
|
|
|
def calculate(self, profile: DataProfile, validation_results: List[ValidationResult]) -> QualityScore:
|
|
"""Calculate quality score from profile and validation results"""
|
|
# Completeness: average non-null percentage
|
|
completeness = 100 - statistics.mean([c.null_percentage for c in profile.columns]) if profile.columns else 0
|
|
|
|
# Uniqueness: average unique percentage for columns expected to be unique
|
|
unique_cols = [c for c in profile.columns if c.unique_percentage > 90]
|
|
uniqueness = statistics.mean([c.unique_percentage for c in unique_cols]) if unique_cols else 100
|
|
|
|
# Validity: percentage of passed checks
|
|
total_checks = len(validation_results)
|
|
passed_checks = sum(1 for r in validation_results if r.passed)
|
|
validity = (passed_checks / total_checks * 100) if total_checks > 0 else 100
|
|
|
|
# Consistency: percentage of non-error results
|
|
error_checks = sum(1 for r in validation_results if not r.passed and r.severity == "error")
|
|
consistency = ((total_checks - error_checks) / total_checks * 100) if total_checks > 0 else 100
|
|
|
|
# Accuracy: based on pattern matching and type detection
|
|
pattern_detected = sum(1 for c in profile.columns if c.detected_pattern)
|
|
accuracy = min(100, 50 + (pattern_detected / len(profile.columns) * 50)) if profile.columns else 50
|
|
|
|
# Overall: weighted average
|
|
overall = (
|
|
completeness * 0.25 +
|
|
uniqueness * 0.15 +
|
|
validity * 0.30 +
|
|
consistency * 0.20 +
|
|
accuracy * 0.10
|
|
)
|
|
|
|
return QualityScore(
|
|
completeness=round(completeness, 2),
|
|
uniqueness=round(uniqueness, 2),
|
|
validity=round(validity, 2),
|
|
consistency=round(consistency, 2),
|
|
accuracy=round(accuracy, 2),
|
|
overall=round(overall, 2)
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Data Contract Validator
|
|
# =============================================================================
|
|
|
|
class DataContractValidator:
|
|
"""Validate data against a data contract"""
|
|
|
|
def load_contract(self, contract_path: str) -> Dict:
|
|
"""Load a data contract from file"""
|
|
with open(contract_path, 'r') as f:
|
|
content = f.read()
|
|
|
|
# Support both YAML and JSON
|
|
if contract_path.endswith('.yaml') or contract_path.endswith('.yml'):
|
|
# Simple YAML parsing (for basic contracts)
|
|
contract = self._parse_simple_yaml(content)
|
|
else:
|
|
contract = json.loads(content)
|
|
|
|
return contract
|
|
|
|
def _parse_simple_yaml(self, content: str) -> Dict:
|
|
"""Parse simple YAML-like format"""
|
|
result = {}
|
|
current_section = result
|
|
section_stack = [(result, -1)]
|
|
|
|
for line in content.split('\n'):
|
|
if not line.strip() or line.strip().startswith('#'):
|
|
continue
|
|
|
|
# Calculate indentation
|
|
indent = len(line) - len(line.lstrip())
|
|
line = line.strip()
|
|
|
|
# Pop sections with greater or equal indentation
|
|
while section_stack and section_stack[-1][1] >= indent:
|
|
section_stack.pop()
|
|
|
|
current_section = section_stack[-1][0]
|
|
|
|
if ':' in line:
|
|
key, value = line.split(':', 1)
|
|
key = key.strip()
|
|
value = value.strip()
|
|
|
|
if value:
|
|
# Handle lists
|
|
if value.startswith('[') and value.endswith(']'):
|
|
current_section[key] = [v.strip().strip('"\'') for v in value[1:-1].split(',')]
|
|
elif value.lower() in ('true', 'false'):
|
|
current_section[key] = value.lower() == 'true'
|
|
elif value.isdigit():
|
|
current_section[key] = int(value)
|
|
else:
|
|
current_section[key] = value.strip('"\'')
|
|
else:
|
|
current_section[key] = {}
|
|
section_stack.append((current_section[key], indent))
|
|
elif line.startswith('- '):
|
|
# List item
|
|
if not isinstance(current_section, list):
|
|
# Convert to list
|
|
parent = section_stack[-2][0] if len(section_stack) > 1 else result
|
|
for k, v in parent.items():
|
|
if v is current_section:
|
|
parent[k] = [current_section] if current_section else []
|
|
current_section = parent[k]
|
|
section_stack[-1] = (current_section, section_stack[-1][1])
|
|
break
|
|
current_section.append(line[2:].strip())
|
|
|
|
return result
|
|
|
|
def validate_contract(self, data: List[Dict], contract: Dict) -> List[ValidationResult]:
|
|
"""Validate data against contract"""
|
|
results = []
|
|
|
|
# Validate schema section
|
|
if 'schema' in contract:
|
|
schema_def = contract['schema']
|
|
columns = schema_def.get('columns', schema_def.get('fields', []))
|
|
|
|
for col_def in columns:
|
|
col_name = col_def.get('name', col_def.get('column', ''))
|
|
if not col_name:
|
|
continue
|
|
|
|
# Check column exists
|
|
if data and col_name not in data[0]:
|
|
results.append(ValidationResult(
|
|
check_name="contract_column_exists",
|
|
column=col_name,
|
|
passed=False,
|
|
expected="column present",
|
|
actual="column missing",
|
|
severity="error",
|
|
message=f"Contract requires column '{col_name}' but it's missing"
|
|
))
|
|
continue
|
|
|
|
# Check data type
|
|
expected_type = col_def.get('type', col_def.get('data_type', 'string'))
|
|
values = [row.get(col_name) for row in data]
|
|
non_null = [str(v) for v in values if v is not None and v != '']
|
|
|
|
if non_null:
|
|
detected_type = TypeDetector.detect_type(non_null[:1000])
|
|
type_compatible = self._types_compatible(detected_type, expected_type)
|
|
|
|
if not type_compatible:
|
|
results.append(ValidationResult(
|
|
check_name="contract_data_type",
|
|
column=col_name,
|
|
passed=False,
|
|
expected=expected_type,
|
|
actual=detected_type,
|
|
severity="error",
|
|
message=f"Contract expects type '{expected_type}' but detected '{detected_type}'"
|
|
))
|
|
|
|
# Check nullable
|
|
if not col_def.get('nullable', True):
|
|
null_count = sum(1 for v in values if v is None or v == '')
|
|
if null_count > 0:
|
|
results.append(ValidationResult(
|
|
check_name="contract_not_null",
|
|
column=col_name,
|
|
passed=False,
|
|
expected="no nulls",
|
|
actual=f"{null_count} nulls",
|
|
severity="error",
|
|
message=f"Contract requires non-null but found {null_count} nulls"
|
|
))
|
|
|
|
# Validate SLA section
|
|
if 'sla' in contract:
|
|
sla = contract['sla']
|
|
|
|
# Row count bounds
|
|
min_rows = sla.get('min_rows', sla.get('minimum_records'))
|
|
max_rows = sla.get('max_rows', sla.get('maximum_records'))
|
|
|
|
row_count = len(data)
|
|
if min_rows and row_count < min_rows:
|
|
results.append(ValidationResult(
|
|
check_name="contract_min_rows",
|
|
column=None,
|
|
passed=False,
|
|
expected=f">= {min_rows} rows",
|
|
actual=f"{row_count} rows",
|
|
severity="error",
|
|
message=f"Contract requires at least {min_rows} rows"
|
|
))
|
|
|
|
if max_rows and row_count > max_rows:
|
|
results.append(ValidationResult(
|
|
check_name="contract_max_rows",
|
|
column=None,
|
|
passed=False,
|
|
expected=f"<= {max_rows} rows",
|
|
actual=f"{row_count} rows",
|
|
severity="warning",
|
|
message=f"Contract allows at most {max_rows} rows"
|
|
))
|
|
|
|
return results
|
|
|
|
def _types_compatible(self, detected: str, expected: str) -> bool:
|
|
"""Check if detected type is compatible with expected type"""
|
|
expected = expected.lower()
|
|
detected = detected.lower()
|
|
|
|
type_groups = {
|
|
'numeric': ['integer', 'int', 'float', 'double', 'decimal', 'number'],
|
|
'string': ['string', 'varchar', 'char', 'text'],
|
|
'boolean': ['boolean', 'bool'],
|
|
'date': ['date', 'date_iso'],
|
|
'datetime': ['datetime', 'datetime_iso', 'timestamp'],
|
|
}
|
|
|
|
for group, types in type_groups.items():
|
|
if expected in types and detected in types:
|
|
return True
|
|
|
|
return detected == expected
|
|
|
|
|
|
# =============================================================================
|
|
# Report Generator
|
|
# =============================================================================
|
|
|
|
class ReportGenerator:
|
|
"""Generate validation reports"""
|
|
|
|
def generate_text_report(self,
|
|
profile: DataProfile,
|
|
results: List[ValidationResult],
|
|
score: QualityScore) -> str:
|
|
"""Generate a text report"""
|
|
lines = []
|
|
lines.append("=" * 80)
|
|
lines.append("DATA QUALITY VALIDATION REPORT")
|
|
lines.append("=" * 80)
|
|
lines.append(f"\nDataset: {profile.name}")
|
|
lines.append(f"Generated: {datetime.now().isoformat()}")
|
|
lines.append(f"Rows: {profile.row_count:,}")
|
|
lines.append(f"Columns: {profile.column_count}")
|
|
lines.append(f"Duplicate Rows: {profile.duplicate_rows:,}")
|
|
|
|
# Quality Score
|
|
lines.append("\n" + "-" * 40)
|
|
lines.append("QUALITY SCORES")
|
|
lines.append("-" * 40)
|
|
lines.append(f" Overall: {score.overall:>6.1f}% {'✓' if score.overall >= 80 else '✗'}")
|
|
lines.append(f" Completeness: {score.completeness:>6.1f}%")
|
|
lines.append(f" Uniqueness: {score.uniqueness:>6.1f}%")
|
|
lines.append(f" Validity: {score.validity:>6.1f}%")
|
|
lines.append(f" Consistency: {score.consistency:>6.1f}%")
|
|
lines.append(f" Accuracy: {score.accuracy:>6.1f}%")
|
|
|
|
# Validation Results Summary
|
|
passed = sum(1 for r in results if r.passed)
|
|
failed = len(results) - passed
|
|
errors = sum(1 for r in results if not r.passed and r.severity == "error")
|
|
warnings = sum(1 for r in results if not r.passed and r.severity == "warning")
|
|
|
|
lines.append("\n" + "-" * 40)
|
|
lines.append("VALIDATION SUMMARY")
|
|
lines.append("-" * 40)
|
|
lines.append(f" Total Checks: {len(results)}")
|
|
lines.append(f" Passed: {passed} ✓")
|
|
lines.append(f" Failed: {failed} ✗")
|
|
lines.append(f" Errors: {errors}")
|
|
lines.append(f" Warnings: {warnings}")
|
|
|
|
# Failed checks details
|
|
if failed > 0:
|
|
lines.append("\n" + "-" * 40)
|
|
lines.append("FAILED CHECKS")
|
|
lines.append("-" * 40)
|
|
|
|
for r in results:
|
|
if not r.passed:
|
|
severity_icon = "❌" if r.severity == "error" else "⚠️"
|
|
col_str = f"[{r.column}]" if r.column else ""
|
|
lines.append(f"\n{severity_icon} {r.check_name} {col_str}")
|
|
lines.append(f" Expected: {r.expected}")
|
|
lines.append(f" Actual: {r.actual}")
|
|
if r.message:
|
|
lines.append(f" Message: {r.message}")
|
|
|
|
# Column profiles
|
|
lines.append("\n" + "-" * 40)
|
|
lines.append("COLUMN PROFILES")
|
|
lines.append("-" * 40)
|
|
|
|
for col in profile.columns:
|
|
lines.append(f"\n {col.name}")
|
|
lines.append(f" Type: {col.data_type}")
|
|
lines.append(f" Nulls: {col.null_count:,} ({col.null_percentage:.1f}%)")
|
|
lines.append(f" Unique: {col.unique_count:,} ({col.unique_percentage:.1f}%)")
|
|
|
|
if col.min_value is not None:
|
|
lines.append(f" Range: [{col.min_value:.2f}, {col.max_value:.2f}]")
|
|
lines.append(f" Mean: {col.mean:.2f}, Median: {col.median:.2f}")
|
|
|
|
if col.min_length is not None:
|
|
lines.append(f" Length: [{col.min_length}, {col.max_length}] (avg: {col.avg_length:.1f})")
|
|
|
|
if col.detected_pattern:
|
|
lines.append(f" Pattern: {col.detected_pattern}")
|
|
|
|
if col.top_values:
|
|
top_3 = col.top_values[:3]
|
|
lines.append(f" Top values: {', '.join(f'{v[0]} ({v[1]})' for v in top_3)}")
|
|
|
|
lines.append("\n" + "=" * 80)
|
|
|
|
return "\n".join(lines)
|
|
|
|
def generate_json_report(self,
|
|
profile: DataProfile,
|
|
results: List[ValidationResult],
|
|
score: QualityScore) -> Dict:
|
|
"""Generate a JSON report"""
|
|
return {
|
|
"report_type": "data_quality_validation",
|
|
"generated_at": datetime.now().isoformat(),
|
|
"dataset": {
|
|
"name": profile.name,
|
|
"row_count": profile.row_count,
|
|
"column_count": profile.column_count,
|
|
"duplicate_rows": profile.duplicate_rows,
|
|
"memory_bytes": profile.memory_size_bytes
|
|
},
|
|
"quality_score": asdict(score),
|
|
"validation_summary": {
|
|
"total_checks": len(results),
|
|
"passed": sum(1 for r in results if r.passed),
|
|
"failed": sum(1 for r in results if not r.passed),
|
|
"errors": sum(1 for r in results if not r.passed and r.severity == "error"),
|
|
"warnings": sum(1 for r in results if not r.passed and r.severity == "warning")
|
|
},
|
|
"validation_results": [
|
|
{
|
|
"check": r.check_name,
|
|
"column": r.column,
|
|
"passed": r.passed,
|
|
"severity": r.severity,
|
|
"expected": str(r.expected),
|
|
"actual": str(r.actual),
|
|
"message": r.message
|
|
}
|
|
for r in results
|
|
],
|
|
"column_profiles": [asdict(c) for c in profile.columns]
|
|
}
|
|
|
|
|
|
# =============================================================================
|
|
# Data Loader
|
|
# =============================================================================
|
|
|
|
class DataLoader:
|
|
"""Load data from various formats"""
|
|
|
|
@staticmethod
|
|
def load(file_path: str) -> List[Dict]:
|
|
"""Load data from file"""
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
suffix = path.suffix.lower()
|
|
|
|
if suffix == '.csv':
|
|
return DataLoader._load_csv(file_path)
|
|
elif suffix == '.json':
|
|
return DataLoader._load_json(file_path)
|
|
elif suffix == '.jsonl':
|
|
return DataLoader._load_jsonl(file_path)
|
|
else:
|
|
raise ValueError(f"Unsupported file format: {suffix}")
|
|
|
|
@staticmethod
|
|
def _load_csv(file_path: str) -> List[Dict]:
|
|
"""Load CSV file"""
|
|
data = []
|
|
with open(file_path, 'r', newline='', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
data.append(dict(row))
|
|
return data
|
|
|
|
@staticmethod
|
|
def _load_json(file_path: str) -> List[Dict]:
|
|
"""Load JSON file"""
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = json.load(f)
|
|
|
|
if isinstance(content, list):
|
|
return content
|
|
elif isinstance(content, dict):
|
|
# Check for common data keys
|
|
for key in ['data', 'records', 'rows', 'items']:
|
|
if key in content and isinstance(content[key], list):
|
|
return content[key]
|
|
return [content]
|
|
else:
|
|
raise ValueError("JSON must contain array or object with data key")
|
|
|
|
@staticmethod
|
|
def _load_jsonl(file_path: str) -> List[Dict]:
|
|
"""Load JSON Lines file"""
|
|
data = []
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
data.append(json.loads(line))
|
|
return data
|
|
|
|
|
|
# =============================================================================
|
|
# Schema Loader
|
|
# =============================================================================
|
|
|
|
class SchemaLoader:
|
|
"""Load schema definitions"""
|
|
|
|
@staticmethod
|
|
def load(file_path: str) -> DataSchema:
|
|
"""Load schema from JSON file"""
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
schema_dict = json.load(f)
|
|
|
|
columns = []
|
|
for col_def in schema_dict.get('columns', []):
|
|
columns.append(ColumnSchema(
|
|
name=col_def['name'],
|
|
data_type=col_def.get('type', col_def.get('data_type', 'string')),
|
|
nullable=col_def.get('nullable', True),
|
|
unique=col_def.get('unique', False),
|
|
min_value=col_def.get('min_value'),
|
|
max_value=col_def.get('max_value'),
|
|
min_length=col_def.get('min_length'),
|
|
max_length=col_def.get('max_length'),
|
|
pattern=col_def.get('pattern'),
|
|
allowed_values=col_def.get('allowed_values'),
|
|
description=col_def.get('description', '')
|
|
))
|
|
|
|
return DataSchema(
|
|
name=schema_dict.get('name', 'unknown'),
|
|
version=schema_dict.get('version', '1.0'),
|
|
columns=columns,
|
|
primary_key=schema_dict.get('primary_key'),
|
|
row_count_min=schema_dict.get('row_count_min'),
|
|
row_count_max=schema_dict.get('row_count_max')
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# CLI Interface
|
|
# =============================================================================
|
|
|
|
def cmd_validate(args):
|
|
"""Run validation against schema"""
|
|
logger.info(f"Loading data from {args.input}")
|
|
data = DataLoader.load(args.input)
|
|
|
|
results = []
|
|
|
|
if args.schema:
|
|
logger.info(f"Loading schema from {args.schema}")
|
|
schema = SchemaLoader.load(args.schema)
|
|
|
|
validator = SchemaValidator()
|
|
results = validator.validate(data, schema)
|
|
|
|
if args.detect_anomalies:
|
|
logger.info("Running anomaly detection")
|
|
anomaly_detector = AnomalyDetector()
|
|
anomaly_results = anomaly_detector.validate(data)
|
|
results.extend(anomaly_results)
|
|
|
|
# Profile data
|
|
profiler = DataProfiler()
|
|
profile = profiler.profile(data, name=Path(args.input).stem)
|
|
|
|
# Calculate score
|
|
score_calc = QualityScoreCalculator()
|
|
score = score_calc.calculate(profile, results)
|
|
|
|
# Generate report
|
|
reporter = ReportGenerator()
|
|
|
|
if args.json:
|
|
report = reporter.generate_json_report(profile, results, score)
|
|
output = json.dumps(report, indent=2)
|
|
else:
|
|
output = reporter.generate_text_report(profile, results, score)
|
|
|
|
if args.output:
|
|
with open(args.output, 'w') as f:
|
|
f.write(output)
|
|
logger.info(f"Report saved to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
# Exit with error if validation failed
|
|
errors = sum(1 for r in results if not r.passed and r.severity == "error")
|
|
if errors > 0:
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_profile(args):
|
|
"""Generate data profile"""
|
|
logger.info(f"Loading data from {args.input}")
|
|
data = DataLoader.load(args.input)
|
|
|
|
profiler = DataProfiler()
|
|
profile = profiler.profile(data, name=Path(args.input).stem)
|
|
|
|
if args.json or args.output:
|
|
output = json.dumps(asdict(profile), indent=2, default=str)
|
|
else:
|
|
# Text output
|
|
lines = []
|
|
lines.append(f"Dataset: {profile.name}")
|
|
lines.append(f"Rows: {profile.row_count:,}")
|
|
lines.append(f"Columns: {profile.column_count}")
|
|
lines.append(f"Duplicate rows: {profile.duplicate_rows:,}")
|
|
lines.append(f"\nColumn Profiles:")
|
|
|
|
for col in profile.columns:
|
|
lines.append(f"\n {col.name} ({col.data_type})")
|
|
lines.append(f" Nulls: {col.null_percentage:.1f}%")
|
|
lines.append(f" Unique: {col.unique_percentage:.1f}%")
|
|
if col.mean is not None:
|
|
lines.append(f" Stats: min={col.min_value}, max={col.max_value}, mean={col.mean:.2f}")
|
|
|
|
output = "\n".join(lines)
|
|
|
|
if args.output:
|
|
with open(args.output, 'w') as f:
|
|
f.write(output)
|
|
logger.info(f"Profile saved to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
|
|
def cmd_generate_suite(args):
|
|
"""Generate Great Expectations suite"""
|
|
logger.info(f"Loading data from {args.input}")
|
|
data = DataLoader.load(args.input)
|
|
|
|
# Profile first
|
|
profiler = DataProfiler()
|
|
profile = profiler.profile(data, name=Path(args.input).stem)
|
|
|
|
# Generate suite
|
|
generator = GreatExpectationsGenerator()
|
|
suite = generator.generate_suite(profile)
|
|
|
|
output = json.dumps(suite, indent=2)
|
|
|
|
if args.output:
|
|
with open(args.output, 'w') as f:
|
|
f.write(output)
|
|
logger.info(f"Expectation suite saved to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
|
|
def cmd_contract(args):
|
|
"""Validate against data contract"""
|
|
logger.info(f"Loading data from {args.input}")
|
|
data = DataLoader.load(args.input)
|
|
|
|
logger.info(f"Loading contract from {args.contract}")
|
|
contract_validator = DataContractValidator()
|
|
contract = contract_validator.load_contract(args.contract)
|
|
|
|
results = contract_validator.validate_contract(data, contract)
|
|
|
|
# Profile data
|
|
profiler = DataProfiler()
|
|
profile = profiler.profile(data, name=Path(args.input).stem)
|
|
|
|
# Calculate score
|
|
score_calc = QualityScoreCalculator()
|
|
score = score_calc.calculate(profile, results)
|
|
|
|
# Generate report
|
|
reporter = ReportGenerator()
|
|
|
|
if args.json:
|
|
report = reporter.generate_json_report(profile, results, score)
|
|
output = json.dumps(report, indent=2)
|
|
else:
|
|
output = reporter.generate_text_report(profile, results, score)
|
|
|
|
if args.output:
|
|
with open(args.output, 'w') as f:
|
|
f.write(output)
|
|
logger.info(f"Report saved to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
# Exit with error if contract validation failed
|
|
errors = sum(1 for r in results if not r.passed and r.severity == "error")
|
|
if errors > 0:
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_schema(args):
|
|
"""Generate schema from data"""
|
|
logger.info(f"Loading data from {args.input}")
|
|
data = DataLoader.load(args.input)
|
|
|
|
if not data:
|
|
logger.error("Empty dataset")
|
|
sys.exit(1)
|
|
|
|
# Profile to detect types
|
|
profiler = DataProfiler()
|
|
profile = profiler.profile(data, name=Path(args.input).stem)
|
|
|
|
# Generate schema
|
|
schema = {
|
|
"name": profile.name,
|
|
"version": "1.0",
|
|
"columns": []
|
|
}
|
|
|
|
for col in profile.columns:
|
|
col_schema = {
|
|
"name": col.name,
|
|
"type": col.data_type,
|
|
"nullable": col.null_percentage > 0,
|
|
"description": ""
|
|
}
|
|
|
|
if col.unique_percentage > 99:
|
|
col_schema["unique"] = True
|
|
|
|
if col.min_value is not None:
|
|
col_schema["min_value"] = col.min_value
|
|
col_schema["max_value"] = col.max_value
|
|
|
|
if col.min_length is not None:
|
|
col_schema["min_length"] = col.min_length
|
|
col_schema["max_length"] = col.max_length
|
|
|
|
if col.detected_pattern:
|
|
col_schema["pattern"] = col.detected_pattern
|
|
|
|
# Add allowed values for low-cardinality columns
|
|
if col.unique_count <= 20 and col.unique_percentage < 10:
|
|
col_schema["allowed_values"] = [v[0] for v in col.top_values]
|
|
|
|
schema["columns"].append(col_schema)
|
|
|
|
output = json.dumps(schema, indent=2)
|
|
|
|
if args.output:
|
|
with open(args.output, 'w') as f:
|
|
f.write(output)
|
|
logger.info(f"Schema saved to {args.output}")
|
|
else:
|
|
print(output)
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Data Quality Validator - Comprehensive data quality validation",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Validate data against schema
|
|
python data_quality_validator.py validate data.csv --schema schema.json
|
|
|
|
# Profile data
|
|
python data_quality_validator.py profile data.csv --output profile.json
|
|
|
|
# Generate Great Expectations suite
|
|
python data_quality_validator.py generate-suite data.csv --output expectations.json
|
|
|
|
# Validate against data contract
|
|
python data_quality_validator.py contract data.csv --contract contract.yaml
|
|
|
|
# Generate schema from data
|
|
python data_quality_validator.py schema data.csv --output schema.json
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
|
|
|
|
subparsers = parser.add_subparsers(dest='command', help='Command to run')
|
|
|
|
# Validate command
|
|
validate_parser = subparsers.add_parser('validate', help='Validate data against schema')
|
|
validate_parser.add_argument('input', help='Input data file (CSV, JSON, JSONL)')
|
|
validate_parser.add_argument('--schema', '-s', help='Schema file (JSON)')
|
|
validate_parser.add_argument('--output', '-o', help='Output report file')
|
|
validate_parser.add_argument('--json', action='store_true', help='Output as JSON')
|
|
validate_parser.add_argument('--detect-anomalies', action='store_true', help='Detect statistical anomalies')
|
|
validate_parser.set_defaults(func=cmd_validate)
|
|
|
|
# Profile command
|
|
profile_parser = subparsers.add_parser('profile', help='Generate data profile')
|
|
profile_parser.add_argument('input', help='Input data file')
|
|
profile_parser.add_argument('--output', '-o', help='Output profile file')
|
|
profile_parser.add_argument('--json', action='store_true', help='Output as JSON')
|
|
profile_parser.set_defaults(func=cmd_profile)
|
|
|
|
# Generate suite command
|
|
suite_parser = subparsers.add_parser('generate-suite', help='Generate Great Expectations suite')
|
|
suite_parser.add_argument('input', help='Input data file')
|
|
suite_parser.add_argument('--output', '-o', help='Output expectations file')
|
|
suite_parser.set_defaults(func=cmd_generate_suite)
|
|
|
|
# Contract command
|
|
contract_parser = subparsers.add_parser('contract', help='Validate against data contract')
|
|
contract_parser.add_argument('input', help='Input data file')
|
|
contract_parser.add_argument('--contract', '-c', required=True, help='Data contract file (YAML or JSON)')
|
|
contract_parser.add_argument('--output', '-o', help='Output report file')
|
|
contract_parser.add_argument('--json', action='store_true', help='Output as JSON')
|
|
contract_parser.set_defaults(func=cmd_contract)
|
|
|
|
# Schema command
|
|
schema_parser = subparsers.add_parser('schema', help='Generate schema from data')
|
|
schema_parser.add_argument('input', help='Input data file')
|
|
schema_parser.add_argument('--output', '-o', help='Output schema file')
|
|
schema_parser.set_defaults(func=cmd_schema)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
try:
|
|
args.func(args)
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
if args.verbose:
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|