Skip to main content

Data Ingestion Overview

The data pipeline handles CSV file ingestion, schema validation, data transformation, and quality assurance for both training and prediction workflows.
The system uses separate schemas for training (schema_training.json) and prediction (schema_prediction.json) to account for the presence/absence of the target variable fraud_reported.

Data Sources

Training Data

Input Location: Training_FileFromDB/InputFile.csv Schema Reference: schema_training.json Expected Columns: 39 (including fraud_reported target) Implementation:
from data_ingestion.data_loader import Data_Getter

data_getter = Data_Getter(file_object, logger_object)
data = data_getter.get_data()  # Returns pandas DataFrame
Reference: data_ingestion/data_loader.py:16-30

Prediction Data

Input Location: Specified via API request filepath Schema Reference: schema_prediction.json Expected Columns: 38 (excluding fraud_reported) Implementation:
from data_ingestion.data_loader_prediction import Data_Getter_Pred

data_getter = Data_Getter_Pred(file_object, logger_object)
data = data_getter.get_data()  # Returns pandas DataFrame

Schema Validation

Training Schema Structure

The schema_training.json file defines the expected data format:
{
  "SampleFileName": "fraudDetection_021119920_010222.csv",
  "LengthOfDateStampInFile": 9,
  "LengthOfTimeStampInFile": 6,
  "NumberofColumns": 39,
  "ColName": {
    "months_as_customer": "Integer",
    "age": "Integer",
    "policy_number": "Integer",
    "policy_bind_date": "varchar",
    "policy_state": "varchar",
    "policy_csl": "varchar",
    "policy_deductable": "Integer",
    "policy_annual_premium": "Integer",
    "umbrella_limit": "Integer",
    "insured_zip": "Integer",
    "insured_sex": "varchar",
    "insured_education_level": "varchar",
    "insured_occupation": "varchar",
    "insured_hobbies": "varchar",
    "insured_relationship": "varchar",
    "capital-gains": "Integer",
    "capital-loss": "Integer",
    "incident_date": "varchar",
    "incident_type": "varchar",
    "collision_type": "varchar",
    "incident_severity": "varchar",
    "authorities_contacted": "varchar",
    "incident_state": "varchar",
    "incident_city": "varchar",
    "incident_location": "varchar",
    "incident_hour_of_the_day": "Integer",
    "number_of_vehicles_involved": "Integer",
    "property_damage": "varchar",
    "bodily_injuries": "Integer",
    "witnesses": "Integer",
    "police_report_available": "varchar",
    "total_claim_amount": "Integer",
    "injury_claim": "Integer",
    "property_claim": "Integer",
    "vehicle_claim": "Integer",
    "auto_make": "varchar",
    "auto_model": "varchar",
    "auto_year": "Integer",
    "fraud_reported": "varchar"
  }
}
Reference: schema_training.json

Validation Rules

Files must follow the pattern: fraudDetection_{DateStamp}_{TimeStamp}.csv
  • DateStamp Length: 9 characters
  • TimeStamp Length: 6 characters
  • Example: fraudDetection_021119920_010222.csv
  • Training: Must have exactly 39 columns
  • Prediction: Must have exactly 38 columns (no fraud_reported)
Files with incorrect column counts are rejected and moved to archive.
Each column’s data type is validated against the schema:
  • Integer: Numeric values (can contain NaN)
  • varchar: String values (categorical or text)
Type mismatches trigger validation errors.
Column names must exactly match the schema definition. Missing or extra columns cause validation failure.

Validation Modules

Module: DataTypeValidation_Insertion_Training/Workflow:
  1. Read schema from schema_training.json
  2. Validate file name format
  3. Check column count (39 expected)
  4. Validate data types per column
  5. Move valid files to processing queue
  6. Archive bad data to TrainingArchiveBadData/
Orchestrator: training_Validation_Insertion.py

Data Transformation Steps

The preprocessing pipeline applies identical transformations to training and prediction data:

Step 1: Column Removal

Method: Preprocessor.remove_columns() Columns Removed: 14 features that don’t contribute to prediction
columns_to_remove = [
    'policy_number',      # Unique identifier, no predictive value
    'policy_bind_date',   # Temporal feature, removed to avoid leakage
    'policy_state',       # High cardinality categorical
    'insured_zip',        # High cardinality, location proxy
    'incident_location',  # Free text, high cardinality
    'incident_date',      # Temporal, redundant with timestamp
    'incident_state',     # High cardinality
    'incident_city',      # High cardinality
    'insured_hobbies',    # Free text, noisy
    'auto_make',          # High cardinality
    'auto_model',         # High cardinality
    'auto_year',          # Continuous but removed
    'age',                # Redundant with months_as_customer
    'total_claim_amount'  # Aggregate of injury/property/vehicle claims
]
Reference: trainingModel.py:40, predictFromModel.py:30-34
Removing total_claim_amount prevents data leakage since it aggregates the individual claim amounts already included as features.

Step 2: Missing Value Handling

Method: Preprocessor.is_null_present()Preprocessor.impute_missing_values() Process:
  1. Detection: Identify columns with missing values (NaN)
  2. Logging: Save null value report to preprocessing_data/null_values.csv
  3. Imputation: Use CategoricalImputer for all columns with missing data
data.replace('?', np.NaN, inplace=True)  # Replace '?' with NaN

is_null_present, cols_with_missing_values = preprocessor.is_null_present(data)

if is_null_present:
    data = preprocessor.impute_missing_values(data, cols_with_missing_values)
Reference: data_preprocessing/preprocessing.py:97-155
The system uses CategoricalImputer which fills missing values with the most frequent category, appropriate for the predominantly categorical dataset.

Step 3: Categorical Encoding

Method: Preprocessor.encode_categorical_columns() Encoding Strategy: Hybrid approach using label encoding + one-hot encoding

Label Encoding (Ordinal Features)

encoding_mappings = {
    'policy_csl': {'100/300': 1, '250/500': 2.5, '500/1000': 5},
    'insured_education_level': {
        'JD': 1, 'High School': 2, 'College': 3, 
        'Masters': 4, 'Associate': 5, 'MD': 6, 'PhD': 7
    },
    'incident_severity': {
        'Trivial Damage': 1, 'Minor Damage': 2, 
        'Major Damage': 3, 'Total Loss': 4
    },
    'insured_sex': {'FEMALE': 0, 'MALE': 1},
    'property_damage': {'NO': 0, 'YES': 1},
    'police_report_available': {'NO': 0, 'YES': 1},
    'fraud_reported': {'N': 0, 'Y': 1}  # Training only
}
Reference: data_preprocessing/preprocessing.py:207-217

One-Hot Encoding (Nominal Features)

Remaining categorical columns are one-hot encoded:
  • insured_occupation
  • insured_relationship
  • incident_type
  • collision_type
  • authorities_contacted
Implementation:
for col in cat_df.drop(columns=cols_to_drop).columns:
    cat_df = pd.get_dummies(cat_df, columns=[col], 
                            prefix=[col], drop_first=True)
Reference: data_preprocessing/preprocessing.py:226-227
drop_first=True prevents multicollinearity by dropping one category per feature.

Step 4: Feature Scaling

Method: Preprocessor.scale_numerical_columns() Scaler: StandardScaler (z-score normalization) Numerical Features Scaled:
numerical_features = [
    'months_as_customer',
    'policy_deductable',
    'umbrella_limit',
    'capital-gains',
    'capital-loss',
    'incident_hour_of_the_day',
    'number_of_vehicles_involved',
    'bodily_injuries',
    'witnesses',
    'injury_claim',
    'property_claim',
    'vehicle_claim'
]
Transformation:
scaler = StandardScaler()
scaled_data = scaler.fit_transform(numerical_features)
# scaled_data has mean=0, std=1
Reference: data_preprocessing/preprocessing.py:156-190
Training vs Prediction: In production, the scaler should be fit on training data and only transform prediction data to prevent data leakage. The current implementation fits on each dataset independently.

Step 5: Label Separation (Training Only)

Method: Preprocessor.separate_label_feature()
X, Y = preprocessor.separate_label_feature(data, 
                                          label_column_name='fraud_reported')
# X: Features DataFrame
# Y: Target Series (0=N, 1=Y)
Reference: data_preprocessing/preprocessing.py:74-95

Data Quality Checks

Missing Value Report

Output: preprocessing_data/null_values.csv Content:
columns,missing values count
months_as_customer,0
policy_csl,0
collision_type,178
property_damage,360
police_report_available,360
...
Reference: data_preprocessing/preprocessing.py:121-124

Elbow Plot

Output: preprocessing_data/K-Means_Elbow.PNG Purpose: Visual confirmation of optimal cluster count Method: K-Means WCSS (Within-Cluster Sum of Squares) analysis
wcss = []
for i in range(1, 11):
    kmeans = KMeans(n_clusters=i, init='k-means++', random_state=42)
    kmeans.fit(data)
    wcss.append(kmeans.inertia_)

# Find elbow using KneeLocator
kn = KneeLocator(range(1, 11), wcss, curve='convex', direction='decreasing')
optimal_clusters = kn.knee
Reference: data_preprocessing/clustering.py:19-47

Output Formats

Training Output

Model Files: Saved to models/ directory (via File_Operation) Naming Convention:
  • K-Means model: KMeans
  • Cluster models: XGBoost0, SVM1, etc.
Format: Pickled scikit-learn/XGBoost objects

Prediction Output

File: Prediction_Output_File/Predictions.csv Format:
Predictions
N
Y
N
N
Y
...
Mapping:
  • N - Not fraudulent (model output = 0)
  • Y - Fraudulent (model output = 1)
Reference: predictFromModel.py:64-71
The prediction file is created with mode='a+' which appends results. The deletePredictionFile() method ensures old predictions are cleared before new runs.

Data Pipeline Error Handling

Logging Strategy

All pipeline operations are logged via App_Logger:
logger_object.log(file_object, 'Entered the get_data method of the Data_Getter class')
try:
    data = pd.read_csv(self.training_file)
    logger_object.log(file_object, 'Data Load Successful')
    return data
except Exception as e:
    logger_object.log(file_object, f'Exception occurred: {str(e)}')
    raise Exception()
Log Locations:
  • Training: Training_Logs/ModelTrainingLog.txt
  • Prediction: Prediction_Logs/Prediction_Log.txt

Bad Data Handling

Training: Files failing validation are moved to TrainingArchiveBadData/ with subdirectories by date Prediction: Bad files are logged and rejected without archiving

Performance Considerations

Batch Processing

CSV files are processed in batch mode, allowing efficient handling of thousands of claims simultaneously.

Memory Management

Pandas DataFrames are used throughout, providing efficient memory usage for tabular data operations.

Preprocessing Efficiency

StandardScaler and one-hot encoding are vectorized operations, ensuring fast transformation.

Model Loading

Models are loaded once per prediction request and cached within the request lifecycle.

Next Steps

Fraud Detection

Learn about the fraud detection methodology and model approach

Architecture

Explore the technical architecture and system components

Build docs developers (and LLMs) love