Documentation Index
Fetch the complete documentation index at: https://mintlify.com/RaviTejaMedarametla/nba-data-preprocessing/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The DataValidator class performs comprehensive data quality checks including missing value analysis, outlier detection, distribution drift detection, schema validation, and temporal drift analysis.
Class Definition
class DataValidator:
def __init__(self)
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:20
No configuration parameters required.
Methods
quality_report
def quality_report(
self,
df: pd.DataFrame,
outlier_mask: pd.Series
) -> ValidationReport
Generates a comprehensive data quality report.
DataFrame to validate (typically cleaned data)
Boolean Series indicating outlier rows (from Preprocessor.detect_outliers_iqr())
Dataclass containing:
missing_ratio (dict): Proportion of missing values per column
outlier_rate (float): Proportion of rows flagged as outliers
drift_score (float): Set to 0.0 (computed separately via drift_detection())
schema_ok (bool): Set to True (computed separately via schema_validation())
schema_issues (list): Empty list (populated by schema_validation())
feature_drift (dict): Empty dict (computed separately via feature_wise_drift())
temporal_drift (dict): Empty dict (computed separately via temporal_drift())
Example:
from pipeline.preprocessing import Preprocessor
from pipeline.validation import DataValidator
import pandas as pd
preprocessor = Preprocessor()
validator = DataValidator()
df = pd.read_csv('nba_data.csv')
cleaned = preprocessor.clean(df)
outlier_mask = preprocessor.detect_outliers_iqr(cleaned.select_dtypes(include='number'))
quality = validator.quality_report(cleaned, outlier_mask)
print(f"Outlier rate: {quality.outlier_rate:.2%}")
print(f"\nMissing values per column:")
for col, ratio in quality.missing_ratio.items():
if ratio > 0:
print(f" {col}: {ratio:.2%}")
schema_validation
def schema_validation(
self,
df: pd.DataFrame,
required_columns: list[str]
) -> tuple[bool, list[str]]
Validates that DataFrame contains required columns and expected data types.
List of column names that must be present
Tuple of (is_valid, issues) where:
is_valid (bool): True if all checks pass
issues (list[str]): List of issue descriptions (empty if valid)
Checks performed:
- Missing required columns →
"missing_columns:[col1, col2, ...]"
- Non-numeric salary column →
"salary_column_not_numeric"
Example:
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
df = pd.read_csv('nba_data.csv')
schema_ok, issues = validator.schema_validation(
df,
required_columns=['version', 'salary', 'b_day', 'draft_year']
)
if schema_ok:
print("✓ Schema validation passed")
else:
print("✗ Schema validation failed:")
for issue in issues:
print(f" - {issue}")
# Example with missing columns
incomplete_df = df.drop(columns=['salary'])
schema_ok, issues = validator.schema_validation(
incomplete_df,
required_columns=['version', 'salary', 'b_day']
)
print(issues) # ['missing_columns:[salary]']
drift_detection
def drift_detection(
self,
baseline: pd.DataFrame,
candidate: pd.DataFrame,
numeric_col: str = 'salary'
) -> float
Detects distribution drift using Kolmogorov-Smirnov statistic.
Baseline (reference) DataFrame
Candidate (current) DataFrame to compare against baseline
Name of numeric column to compare
Kolmogorov-Smirnov statistic (0.0-1.0):
- 0.0: Identical distributions
- 0.0-0.1: Minimal drift
- 0.1-0.3: Moderate drift
-
0.3: Significant drift
Example:
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
# Load baseline and new data
baseline_df = pd.read_csv('nba_2020.csv')
candidate_df = pd.read_csv('nba_2023.csv')
# Check salary drift
drift_score = validator.drift_detection(baseline_df, candidate_df, numeric_col='salary')
print(f"Salary drift score: {drift_score:.4f}")
if drift_score < 0.1:
print("✓ No significant drift detected")
elif drift_score < 0.3:
print("⚠ Moderate drift detected - review data")
else:
print("✗ Significant drift detected - data may be incompatible")
# Check drift in other columns
for col in ['age', 'height', 'weight']:
if col in baseline_df.columns and col in candidate_df.columns:
drift = validator.drift_detection(baseline_df, candidate_df, numeric_col=col)
print(f"{col} drift: {drift:.4f}")
feature_wise_drift
def feature_wise_drift(
self,
baseline: pd.DataFrame,
candidate: pd.DataFrame
) -> dict
Computes drift metrics for each feature individually.
Dictionary with one entry per common column:For numeric columns:{
'column_name': {
'type': 'numeric',
'ks_like': float, # KS statistic
'mean_shift': float # candidate_mean - baseline_mean
}
}
For categorical columns:{
'column_name': {
'type': 'categorical',
'l1_drift': float # L1 distance between distributions
}
}
Example:
from pipeline.validation import DataValidator
import pandas as pd
import json
validator = DataValidator()
baseline = pd.read_csv('nba_2020.csv')
candidate = pd.read_csv('nba_2023.csv')
drift_report = validator.feature_wise_drift(baseline, candidate)
print("Feature-wise drift report:")
print(json.dumps(drift_report, indent=2))
# Identify high-drift features
for feature, metrics in drift_report.items():
if metrics['type'] == 'numeric':
if metrics['ks_like'] > 0.3:
print(f"⚠ High drift in {feature}:")
print(f" KS statistic: {metrics['ks_like']:.4f}")
print(f" Mean shift: {metrics['mean_shift']:+.2f}")
else: # categorical
if metrics['l1_drift'] > 0.5:
print(f"⚠ High drift in {feature} (categorical):")
print(f" L1 distance: {metrics['l1_drift']:.4f}")
temporal_drift
def temporal_drift(
self,
df: pd.DataFrame,
time_col: str = 'version'
) -> dict
Analyzes temporal drift by comparing early vs late time periods.
DataFrame with temporal data
Column name containing version/year information (extracts year with regex r'(\d+)$')
If insufficient data:{'status': 'insufficient_data'}
Otherwise:{
'early_vs_late_salary_drift': float, # KS statistic
'early_mean_salary': float,
'late_mean_salary': float
}
Example:
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
df = pd.read_csv('nba_historical_data.csv')
temporal_report = validator.temporal_drift(df, time_col='version')
if 'status' in temporal_report:
print(f"Cannot analyze: {temporal_report['status']}")
else:
print("Temporal drift analysis:")
print(f" Early period mean salary: ${temporal_report['early_mean_salary']:,.2f}")
print(f" Late period mean salary: ${temporal_report['late_mean_salary']:,.2f}")
print(f" Salary drift: {temporal_report['early_vs_late_salary_drift']:.4f}")
# Calculate change
change_pct = (
(temporal_report['late_mean_salary'] - temporal_report['early_mean_salary'])
/ temporal_report['early_mean_salary'] * 100
)
print(f" Salary change: {change_pct:+.1f}%")
ValidationReport
@dataclass
class ValidationReport:
missing_ratio: dict
outlier_rate: float
drift_score: float
schema_ok: bool
schema_issues: list[str]
feature_drift: dict
temporal_drift: dict
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/validation/checks.py:9
Dataclass encapsulating comprehensive validation results.
Usage Patterns
Complete Validation Pipeline
from pipeline.preprocessing import Preprocessor
from pipeline.validation import DataValidator
import pandas as pd
from dataclasses import asdict
import json
preprocessor = Preprocessor()
validator = DataValidator()
df = pd.read_csv('nba_data.csv')
cleaned = preprocessor.clean(df)
# Quality report
outlier_mask = preprocessor.detect_outliers_iqr(cleaned.select_dtypes(include='number'))
quality = validator.quality_report(cleaned, outlier_mask)
# Schema validation
schema_ok, schema_issues = validator.schema_validation(
df,
required_columns=['version', 'salary', 'b_day', 'draft_year']
)
# Drift detection (compare to shuffled data as test)
drift_score = validator.drift_detection(
cleaned,
cleaned.sample(frac=1.0, random_state=42)
)
# Complete report
full_report = asdict(quality)
full_report['drift_score'] = drift_score
full_report['schema_ok'] = schema_ok
full_report['schema_issues'] = schema_issues
print(json.dumps(full_report, indent=2))
Monitoring Production Data
from pipeline.validation import DataValidator
import pandas as pd
validator = DataValidator()
# Load baseline (training data)
baseline = pd.read_csv('training_data.csv')
# Load production data
production = pd.read_csv('production_data_latest.csv')
# Comprehensive drift analysis
overall_drift = validator.drift_detection(baseline, production, 'salary')
feature_drift = validator.feature_wise_drift(baseline, production)
temporal_drift = validator.temporal_drift(production)
# Alert on significant drift
if overall_drift > 0.3:
print(f"🚨 ALERT: Significant drift detected ({overall_drift:.4f})")
print("\nPer-feature analysis:")
for feature, metrics in feature_drift.items():
if metrics.get('ks_like', 0) > 0.3 or metrics.get('l1_drift', 0) > 0.5:
print(f" High drift in {feature}: {metrics}")
Schema Validation in ETL
from pipeline.validation import DataValidator
import pandas as pd
import sys
validator = DataValidator()
REQUIRED_COLUMNS = [
'version', 'name', 'team', 'position',
'height', 'weight', 'b_day', 'salary',
'country', 'draft_year', 'draft_round', 'draft_peak'
]
def validate_input_file(file_path: str) -> bool:
try:
df = pd.read_csv(file_path)
schema_ok, issues = validator.schema_validation(df, REQUIRED_COLUMNS)
if not schema_ok:
print(f"❌ Validation failed for {file_path}:")
for issue in issues:
print(f" - {issue}")
return False
print(f"✅ Validation passed for {file_path}")
return True
except Exception as e:
print(f"❌ Error loading {file_path}: {e}")
return False
if __name__ == '__main__':
files_to_validate = ['nba_2021.csv', 'nba_2022.csv', 'nba_2023.csv']
results = [validate_input_file(f) for f in files_to_validate]
if not all(results):
sys.exit(1) # Fail ETL pipeline
Notes
- KS statistic ranges from 0.0 (identical) to 1.0 (completely different)
- L1 drift for categoricals is the sum of absolute differences in category proportions
- Temporal drift splits data at the midpoint after sorting by extracted year
- Schema validation automatically handles
$ prefix in salary column
- Year extraction expects format
"NBA2K{YY}" and handles both 2-digit and 4-digit years
- All drift methods return 0.0 for empty or invalid data rather than raising errors
- Missing values are handled gracefully with
.dropna() before drift computation