Skip to main content

Overview

This guide covers how to load trained models, prepare input data, and make AQI predictions in various deployment scenarios, from batch processing to real-time API endpoints.

Prerequisites

Ensure you have:
  • Trained models saved from the training guide
  • Feature scaler from data preparation
  • Clear understanding of required input features

Loading Trained Models

1

Load Model and Artifacts

Load your saved model and preprocessing artifacts.
import xgboost as xgb
import joblib
import json

# Load XGBoost model
xgb_model = xgb.Booster()
xgb_model.load_model('models/xgboost_model.json')

# Load feature scaler
scaler = joblib.load('models/feature_scaler.pkl')

# Load metadata
with open('models/metadata.json', 'r') as f:
    metadata = json.load(f)
    feature_names = metadata['features']

print(f"Model loaded: {len(feature_names)} features")
print(f"Expected features: {feature_names[:5]}...")
2

Create Prediction Pipeline

Build a reusable pipeline for preprocessing and prediction.
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class AQIPredictor:
    def __init__(self, model, scaler, feature_names):
        self.model = model
        self.scaler = scaler
        self.feature_names = feature_names
        self.model_type = self._detect_model_type()
    
    def _detect_model_type(self):
        """Detect model framework"""
        if hasattr(self.model, 'get_score'):
            return 'xgboost'
        elif hasattr(self.model, 'num_trees'):
            return 'lightgbm'
        elif hasattr(self.model, 'predict_proba'):
            return 'sklearn'
        elif hasattr(self.model, 'layers'):
            return 'keras'
        else:
            return 'unknown'
    
    def prepare_features(self, raw_data, historical_data=None):
        """
        Transform raw input into model features
        
        Args:
            raw_data: Dict with current readings
            historical_data: DataFrame with past readings for lag features
        """
        df = pd.DataFrame([raw_data])
        
        # Add temporal features
        if 'timestamp' in raw_data:
            ts = pd.to_datetime(raw_data['timestamp'])
        else:
            ts = pd.Timestamp.now()
        
        df['hour'] = ts.hour
        df['day_of_week'] = ts.dayofweek
        df['month'] = ts.month
        df['is_weekend'] = int(ts.dayofweek in [5, 6])
        
        # Cyclical encoding
        df['hour_sin'] = np.sin(2 * np.pi * ts.hour / 24)
        df['hour_cos'] = np.cos(2 * np.pi * ts.hour / 24)
        
        # Season
        season_map = {
            12: 0, 1: 0, 2: 0,
            3: 1, 4: 1, 5: 1,
            6: 2, 7: 2, 8: 2,
            9: 3, 10: 3, 11: 3
        }
        df['season'] = season_map[ts.month]
        
        # Add lag features if historical data provided
        if historical_data is not None:
            for lag in [1, 3, 6, 12, 24]:
                if len(historical_data) >= lag:
                    df[f'pm25_lag_{lag}h'] = historical_data['pm25'].iloc[-lag]
                    df[f'pm10_lag_{lag}h'] = historical_data['pm10'].iloc[-lag]
                else:
                    # Use current value if insufficient history
                    df[f'pm25_lag_{lag}h'] = raw_data.get('pm25', 0)
                    df[f'pm10_lag_{lag}h'] = raw_data.get('pm10', 0)
            
            # Rolling statistics
            for window in [3, 6, 12, 24]:
                if len(historical_data) >= window:
                    df[f'pm25_rolling_mean_{window}h'] = historical_data['pm25'].tail(window).mean()
                    df[f'pm25_rolling_std_{window}h'] = historical_data['pm25'].tail(window).std()
                else:
                    df[f'pm25_rolling_mean_{window}h'] = raw_data.get('pm25', 0)
                    df[f'pm25_rolling_std_{window}h'] = 0
        else:
            # Initialize with current values
            for lag in [1, 3, 6, 12, 24]:
                df[f'pm25_lag_{lag}h'] = raw_data.get('pm25', 0)
                df[f'pm10_lag_{lag}h'] = raw_data.get('pm10', 0)
            for window in [3, 6, 12, 24]:
                df[f'pm25_rolling_mean_{window}h'] = raw_data.get('pm25', 0)
                df[f'pm25_rolling_std_{window}h'] = 0
        
        # Interaction features
        df['pm_ratio'] = df['pm25'] / (df['pm10'] + 1e-5)
        df['temp_humidity'] = raw_data.get('temperature', 20) * raw_data.get('humidity', 50)
        df['pollutant_index'] = (
            raw_data.get('pm25', 0) * 0.3 +
            raw_data.get('pm10', 0) * 0.2 +
            raw_data.get('no2', 0) * 0.2 +
            raw_data.get('so2', 0) * 0.15 +
            raw_data.get('co', 0) * 0.15
        )
        
        # Ensure all expected features are present
        for feat in self.feature_names:
            if feat not in df.columns:
                df[feat] = 0
        
        # Select and order features
        df = df[self.feature_names]
        
        return df
    
    def predict(self, raw_data, historical_data=None, return_details=False):
        """
        Make AQI prediction
        
        Args:
            raw_data: Dict with current sensor readings
            historical_data: Optional DataFrame with historical readings
            return_details: If True, return additional info
        
        Returns:
            Predicted AQI value or dict with details
        """
        # Prepare features
        features_df = self.prepare_features(raw_data, historical_data)
        
        # Scale features
        features_scaled = self.scaler.transform(features_df)
        
        # Predict based on model type
        if self.model_type == 'xgboost':
            dmatrix = xgb.DMatrix(features_scaled)
            prediction = self.model.predict(dmatrix)[0]
        elif self.model_type == 'lightgbm':
            prediction = self.model.predict(features_scaled)[0]
        elif self.model_type in ['sklearn', 'keras']:
            prediction = self.model.predict(features_scaled)[0]
            if isinstance(prediction, np.ndarray):
                prediction = prediction[0]
        else:
            raise ValueError(f"Unsupported model type: {self.model_type}")
        
        # Clip to valid AQI range
        prediction = np.clip(prediction, 0, 500)
        
        if return_details:
            return {
                'aqi': float(prediction),
                'category': self._get_aqi_category(prediction),
                'timestamp': datetime.now().isoformat(),
                'input_features': raw_data
            }
        
        return float(prediction)
    
    @staticmethod
    def _get_aqi_category(aqi_value):
        """Map AQI value to category"""
        if aqi_value <= 50:
            return {'level': 'Good', 'color': 'green'}
        elif aqi_value <= 100:
            return {'level': 'Moderate', 'color': 'yellow'}
        elif aqi_value <= 150:
            return {'level': 'Unhealthy for Sensitive Groups', 'color': 'orange'}
        elif aqi_value <= 200:
            return {'level': 'Unhealthy', 'color': 'red'}
        elif aqi_value <= 300:
            return {'level': 'Very Unhealthy', 'color': 'purple'}
        else:
            return {'level': 'Hazardous', 'color': 'maroon'}

# Initialize predictor
predictor = AQIPredictor(xgb_model, scaler, feature_names)
print("Prediction pipeline ready!")

Making Predictions

Single Prediction

Predict AQI for current conditions.
# Current sensor readings
current_data = {
    'timestamp': '2024-03-15 14:00:00',
    'pm25': 35.2,
    'pm10': 58.7,
    'no2': 42.1,
    'so2': 12.3,
    'co': 0.8,
    'o3': 65.4,
    'temperature': 22.5,
    'humidity': 55.0,
    'wind_speed': 3.2,
    'wind_direction': 180,
    'pressure': 1013.2
}

# Make prediction
aqi_prediction = predictor.predict(current_data, return_details=True)

print(f"Predicted AQI: {aqi_prediction['aqi']:.1f}")
print(f"Category: {aqi_prediction['category']['level']}")
print(f"Color: {aqi_prediction['category']['color']}")

Batch Predictions

Process multiple predictions efficiently.
def batch_predict(predictor, data_list):
    """
    Make predictions for multiple data points
    
    Args:
        predictor: AQIPredictor instance
        data_list: List of dicts with sensor readings
    
    Returns:
        List of predictions
    """
    predictions = []
    
    for i, data in enumerate(data_list):
        try:
            pred = predictor.predict(data, return_details=True)
            predictions.append(pred)
        except Exception as e:
            print(f"Error predicting sample {i}: {str(e)}")
            predictions.append(None)
    
    return predictions

# Example: Load data from CSV
input_df = pd.read_csv('data/new_readings.csv')
data_list = input_df.to_dict('records')

# Batch predict
predictions = batch_predict(predictor, data_list)

# Add predictions to dataframe
input_df['predicted_aqi'] = [p['aqi'] if p else None for p in predictions]
input_df['aqi_category'] = [p['category']['level'] if p else None for p in predictions]

# Save results
input_df.to_csv('data/predictions.csv', index=False)
print(f"Processed {len(predictions)} predictions")

Time Series Forecasting

Predict future AQI values.
def forecast_aqi(predictor, initial_data, historical_data, hours_ahead=24):
    """
    Forecast AQI for multiple hours ahead
    
    Args:
        predictor: AQIPredictor instance
        initial_data: Current sensor readings
        historical_data: Recent historical readings
        hours_ahead: Number of hours to forecast
    
    Returns:
        List of forecasted AQI values
    """
    forecasts = []
    current_data = initial_data.copy()
    history = historical_data.copy()
    
    for hour in range(hours_ahead):
        # Predict next hour
        aqi_pred = predictor.predict(current_data, history)
        
        # Store forecast
        forecast_time = pd.to_datetime(current_data['timestamp']) + timedelta(hours=hour+1)
        forecasts.append({
            'timestamp': forecast_time,
            'aqi': aqi_pred
        })
        
        # Update for next iteration
        # (In production, you'd update with actual new readings or use predicted values)
        current_data['timestamp'] = forecast_time
        
        # Append prediction to history
        new_row = pd.DataFrame([{
            'pm25': current_data['pm25'],
            'pm10': current_data['pm10']
        }])
        history = pd.concat([history, new_row], ignore_index=True)
    
    return pd.DataFrame(forecasts)

# Load recent historical data
historical = pd.read_parquet('data/processed/recent_readings.parquet')

# Forecast next 24 hours
forecast_df = forecast_aqi(predictor, current_data, historical, hours_ahead=24)

print("24-Hour Forecast:")
print(forecast_df.head(10))

Deployment Scenarios

REST API Endpoint

Deploy as a web service using Flask or FastAPI.
from flask import Flask, request, jsonify
import traceback

app = Flask(__name__)

# Initialize predictor globally
predictor = AQIPredictor(xgb_model, scaler, feature_names)

@app.route('/predict', methods=['POST'])
def predict():
    """
    Predict AQI from JSON input
    
    Request body:
    {
        "pm25": 35.2,
        "pm10": 58.7,
        "no2": 42.1,
        ...
    }
    """
    try:
        data = request.get_json()
        
        # Validate input
        required_fields = ['pm25', 'pm10', 'no2', 'so2', 'co', 'o3', 
                          'temperature', 'humidity']
        missing = [f for f in required_fields if f not in data]
        if missing:
            return jsonify({
                'error': f'Missing required fields: {missing}'
            }), 400
        
        # Make prediction
        result = predictor.predict(data, return_details=True)
        
        return jsonify(result), 200
    
    except Exception as e:
        return jsonify({
            'error': str(e),
            'traceback': traceback.format_exc()
        }), 500

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy'}), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Streaming Predictions

Process real-time sensor data streams.
import time
from collections import deque

class StreamingPredictor:
    def __init__(self, predictor, buffer_size=24):
        self.predictor = predictor
        self.buffer = deque(maxlen=buffer_size)
    
    def process_reading(self, reading):
        """
        Process incoming sensor reading
        """
        # Add to buffer
        self.buffer.append(reading)
        
        # Create historical dataframe
        if len(self.buffer) > 1:
            history = pd.DataFrame(list(self.buffer)[:-1])
        else:
            history = None
        
        # Make prediction
        prediction = self.predictor.predict(reading, history, return_details=True)
        
        return prediction

# Initialize streaming predictor
streaming = StreamingPredictor(predictor, buffer_size=24)

# Simulate real-time stream
def simulate_sensor_stream():
    """Simulate real-time sensor readings"""
    while True:
        # Get new reading (from sensor API, message queue, etc.)
        reading = {
            'timestamp': datetime.now().isoformat(),
            'pm25': np.random.uniform(20, 80),
            'pm10': np.random.uniform(30, 120),
            'no2': np.random.uniform(10, 60),
            'so2': np.random.uniform(5, 30),
            'co': np.random.uniform(0.3, 2.0),
            'o3': np.random.uniform(30, 100),
            'temperature': np.random.uniform(15, 30),
            'humidity': np.random.uniform(40, 80)
        }
        
        # Process and predict
        result = streaming.process_reading(reading)
        
        print(f"[{result['timestamp']}] AQI: {result['aqi']:.1f} - {result['category']['level']}")
        
        time.sleep(3600)  # Wait 1 hour

# Run simulation
# simulate_sensor_stream()
For production deployments, consider using message queues (Kafka, RabbitMQ) for handling high-volume streaming data.

Error Handling and Validation

class ValidationError(Exception):
    pass

def validate_sensor_reading(data):
    """
    Validate sensor reading data
    """
    # Check required fields
    required = ['pm25', 'pm10', 'no2', 'so2', 'co', 'o3', 'temperature', 'humidity']
    missing = [f for f in required if f not in data or data[f] is None]
    if missing:
        raise ValidationError(f"Missing required fields: {missing}")
    
    # Check value ranges
    ranges = {
        'pm25': (0, 500),
        'pm10': (0, 600),
        'no2': (0, 400),
        'so2': (0, 300),
        'co': (0, 50),
        'o3': (0, 400),
        'temperature': (-50, 60),
        'humidity': (0, 100)
    }
    
    for field, (min_val, max_val) in ranges.items():
        if field in data:
            value = data[field]
            if not min_val <= value <= max_val:
                raise ValidationError(
                    f"{field}={value} outside valid range [{min_val}, {max_val}]"
                )
    
    return True

# Use in prediction
try:
    validate_sensor_reading(current_data)
    prediction = predictor.predict(current_data)
    print(f"AQI: {prediction}")
except ValidationError as e:
    print(f"Validation error: {e}")
except Exception as e:
    print(f"Prediction error: {e}")
Always validate input data before making predictions. Invalid sensor readings can produce nonsensical predictions.

Performance Optimization

import time

def benchmark_prediction(predictor, test_data, n_iterations=1000):
    """
    Benchmark prediction latency
    """
    times = []
    
    for _ in range(n_iterations):
        start = time.time()
        predictor.predict(test_data)
        elapsed = time.time() - start
        times.append(elapsed)
    
    times = np.array(times) * 1000  # Convert to ms
    
    print(f"Prediction Latency:")
    print(f"  Mean: {times.mean():.2f} ms")
    print(f"  Median: {np.median(times):.2f} ms")
    print(f"  P95: {np.percentile(times, 95):.2f} ms")
    print(f"  P99: {np.percentile(times, 99):.2f} ms")

benchmark_prediction(predictor, current_data)

Next Steps

Build docs developers (and LLMs) love