Files
claude-skills-reference/data-analysis/data-quality-auditor/scripts/outlier_detector.py
amitdhanda48 a6e4cdbbeb feat(data-analysis): data-quality-auditor
Adds a new data-quality-auditor skill with three stdlib-only Python tools:
- data_profiler.py: full dataset profile with DQS (0-100) across 5 dimensions
- missing_value_analyzer.py: MCAR/MAR/MNAR classification + imputation strategies
- outlier_detector.py: IQR, Z-score, and Modified Z-score (MAD) outlier detection

Validator: 86.4/100 (GOOD). Security audit: PASS (0 critical/high).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-31 23:14:13 -07:00

263 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
outlier_detector.py — Multi-method outlier detection for numeric columns.
Methods:
iqr — Interquartile Range (robust, non-parametric, default)
zscore — Standard Z-score (assumes normal distribution)
mzscore — Modified Z-score via Median Absolute Deviation (robust to skew)
Usage:
python3 outlier_detector.py --file data.csv
python3 outlier_detector.py --file data.csv --method iqr
python3 outlier_detector.py --file data.csv --method zscore --threshold 2.5
python3 outlier_detector.py --file data.csv --columns col1,col2
python3 outlier_detector.py --file data.csv --format json
"""
import argparse
import csv
import json
import math
import sys
NULL_STRINGS = {"", "null", "none", "n/a", "na", "nan", "nil", "undefined", "missing"}
def load_csv(filepath: str) -> tuple[list[str], list[dict]]:
with open(filepath, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
headers = reader.fieldnames or []
return headers, rows
def is_null(val: str) -> bool:
return val.strip().lower() in NULL_STRINGS
def to_float(val: str) -> float | None:
try:
return float(val.strip())
except (ValueError, AttributeError):
return None
def median(nums: list[float]) -> float:
s = sorted(nums)
n = len(s)
mid = n // 2
return s[mid] if n % 2 else (s[mid - 1] + s[mid]) / 2
def percentile(nums: list[float], p: float) -> float:
"""Linear interpolation percentile."""
s = sorted(nums)
n = len(s)
if n == 1:
return s[0]
idx = p / 100 * (n - 1)
lo = int(idx)
hi = lo + 1
frac = idx - lo
if hi >= n:
return s[-1]
return s[lo] + frac * (s[hi] - s[lo])
def mean(nums: list[float]) -> float:
return sum(nums) / len(nums)
def std(nums: list[float], mu: float) -> float:
if len(nums) < 2:
return 0.0
variance = sum((x - mu) ** 2 for x in nums) / (len(nums) - 1)
return math.sqrt(variance)
# --- Detection methods ---
def detect_iqr(nums: list[float], multiplier: float = 1.5) -> dict:
q1 = percentile(nums, 25)
q3 = percentile(nums, 75)
iqr = q3 - q1
lower = q1 - multiplier * iqr
upper = q3 + multiplier * iqr
outliers = [x for x in nums if x < lower or x > upper]
return {
"method": "IQR",
"q1": round(q1, 4),
"q3": round(q3, 4),
"iqr": round(iqr, 4),
"lower_bound": round(lower, 4),
"upper_bound": round(upper, 4),
"outlier_count": len(outliers),
"outlier_pct": round(len(outliers) / len(nums) * 100, 2),
"outlier_values": sorted(set(round(x, 4) for x in outliers))[:10],
}
def detect_zscore(nums: list[float], threshold: float = 3.0) -> dict:
mu = mean(nums)
sigma = std(nums, mu)
if sigma == 0:
return {"method": "Z-score", "outlier_count": 0, "outlier_pct": 0.0,
"note": "Zero variance — all values identical"}
zscores = [(x, abs((x - mu) / sigma)) for x in nums]
outliers = [x for x, z in zscores if z > threshold]
return {
"method": "Z-score",
"mean": round(mu, 4),
"std": round(sigma, 4),
"threshold": threshold,
"outlier_count": len(outliers),
"outlier_pct": round(len(outliers) / len(nums) * 100, 2),
"outlier_values": sorted(set(round(x, 4) for x in outliers))[:10],
}
def detect_modified_zscore(nums: list[float], threshold: float = 3.5) -> dict:
"""Iglewicz-Hoaglin modified Z-score using Median Absolute Deviation."""
med = median(nums)
mad = median([abs(x - med) for x in nums])
if mad == 0:
return {"method": "Modified Z-score (MAD)", "outlier_count": 0, "outlier_pct": 0.0,
"note": "MAD is zero — consider Z-score instead"}
mzscores = [(x, 0.6745 * abs(x - med) / mad) for x in nums]
outliers = [x for x, mz in mzscores if mz > threshold]
return {
"method": "Modified Z-score (MAD)",
"median": round(med, 4),
"mad": round(mad, 4),
"threshold": threshold,
"outlier_count": len(outliers),
"outlier_pct": round(len(outliers) / len(nums) * 100, 2),
"outlier_values": sorted(set(round(x, 4) for x in outliers))[:10],
}
def classify_outlier_risk(pct: float, col: str) -> str:
"""Heuristic: flag whether outliers are likely data errors or legitimate extremes."""
if pct > 10:
return "High outlier rate — likely systematic data quality issue or wrong data type"
if pct > 5:
return "Elevated outlier rate — investigate source; may be mixed populations"
if pct > 1:
return "Moderate — review individually; could be legitimate extremes or entry errors"
if pct > 0:
return "Low — verify extreme values against source; likely legitimate but worth checking"
return "Clean — no outliers detected"
def analyze_column(col: str, nums: list[float], method: str, threshold: float) -> dict:
if len(nums) < 4:
return {"column": col, "status": "Skipped — fewer than 4 numeric values"}
if method == "iqr":
result = detect_iqr(nums, multiplier=threshold if threshold != 3.0 else 1.5)
elif method == "zscore":
result = detect_zscore(nums, threshold=threshold)
elif method == "mzscore":
result = detect_modified_zscore(nums, threshold=threshold)
else:
result = detect_iqr(nums)
result["column"] = col
result["total_numeric"] = len(nums)
result["risk_assessment"] = classify_outlier_risk(result.get("outlier_pct", 0), col)
return result
def print_report(results: list[dict]):
print("=" * 64)
print("OUTLIER DETECTION REPORT")
print("=" * 64)
clean = [r for r in results if r.get("outlier_count", 0) == 0 and "status" not in r]
flagged = [r for r in results if r.get("outlier_count", 0) > 0]
skipped = [r for r in results if "status" in r]
print(f"\nColumns analyzed: {len(results) - len(skipped)}")
print(f"Clean: {len(clean)}")
print(f"Flagged: {len(flagged)}")
if skipped:
print(f"Skipped: {len(skipped)} ({', '.join(r['column'] for r in skipped)})")
if flagged:
print("\n" + "-" * 64)
print("FLAGGED COLUMNS")
print("-" * 64)
for r in sorted(flagged, key=lambda x: -x.get("outlier_pct", 0)):
pct = r.get("outlier_pct", 0)
indicator = "🔴" if pct > 5 else "🟡"
print(f"\n {indicator} {r['column']} ({r['method']})")
print(f" Outliers: {r['outlier_count']} / {r['total_numeric']} rows ({pct}%)")
if "lower_bound" in r:
print(f" Bounds: [{r['lower_bound']}, {r['upper_bound']}] | IQR: {r['iqr']}")
if "mean" in r:
print(f" Mean: {r['mean']} | Std: {r['std']} | Threshold: ±{r['threshold']}σ")
if "median" in r:
print(f" Median: {r['median']} | MAD: {r['mad']} | Threshold: {r['threshold']}")
if r.get("outlier_values"):
vals = ", ".join(str(v) for v in r["outlier_values"][:8])
print(f" Sample outlier values: {vals}")
print(f" Assessment: {r['risk_assessment']}")
if clean:
cols = ", ".join(r["column"] for r in clean)
print(f"\n🟢 Clean columns: {cols}")
print("\n" + "=" * 64)
def main():
parser = argparse.ArgumentParser(description="Detect outliers in numeric columns of a CSV dataset.")
parser.add_argument("--file", required=True, help="Path to CSV file")
parser.add_argument("--method", choices=["iqr", "zscore", "mzscore"], default="iqr",
help="Detection method (default: iqr)")
parser.add_argument("--threshold", type=float, default=None,
help="Method threshold (IQR multiplier default 1.5; Z-score default 3.0; mzscore default 3.5)")
parser.add_argument("--columns", help="Comma-separated columns to check (default: all numeric)")
parser.add_argument("--format", choices=["text", "json"], default="text")
args = parser.parse_args()
# Set default thresholds per method
if args.threshold is None:
args.threshold = {"iqr": 1.5, "zscore": 3.0, "mzscore": 3.5}[args.method]
try:
headers, rows = load_csv(args.file)
except FileNotFoundError:
print(f"Error: file not found: {args.file}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error reading file: {e}", file=sys.stderr)
sys.exit(1)
if not rows:
print("Error: CSV file is empty.", file=sys.stderr)
sys.exit(1)
selected = args.columns.split(",") if args.columns else headers
missing_cols = [c for c in selected if c not in headers]
if missing_cols:
print(f"Error: columns not found: {', '.join(missing_cols)}", file=sys.stderr)
sys.exit(1)
results = []
for col in selected:
raw = [row.get(col, "") for row in rows]
nums = [n for v in raw if not is_null(v) and (n := to_float(v)) is not None]
results.append(analyze_column(col, nums, args.method, args.threshold))
if args.format == "json":
print(json.dumps(results, indent=2))
else:
print_report(results)
if __name__ == "__main__":
main()