Overview
Deploying ML models to production requires careful consideration of serving infrastructure, scalability, monitoring, and reliability. This guide covers end-to-end deployment strategies for AQI prediction systems.Deployment Architecture
- Batch Prediction
- Real-Time Serving
- Streaming
Suitable for scheduled AQI forecasts:Use cases:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Data │────▶│ Model │────▶│ Results │
│ Store │ │ Service │ │ Database │
└─────────────┘ └──────────────┘ └─────────────┘
│ │ │
│ │ │
▼ ▼ ▼
Daily/Hourly Batch Process API/Dashboard
- Daily AQI forecasts
- Historical reprocessing
- Bulk predictions for multiple locations
For on-demand predictions:Use cases:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Client │────▶│ Load │────▶│ Model │
│ Request │ │ Balancer │ │ Server │
└─────────────┘ └──────────────┘ └─────────────┘
│ │
│ │
▼ ▼
Multiple Redis Cache
Replicas
- User-facing applications
- API integrations
- Real-time alerts
For continuous prediction on sensor data:Use cases:
┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Sensor │────▶│ Kafka/ │────▶│ Stream │
│ Data │ │ Kinesis │ │ Processor │
└─────────────┘ └──────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Model │
│ Inference │
└─────────────┘
- IoT sensor networks
- Continuous monitoring
- Real-time anomaly detection
Containerization
Docker Setup
Create a production-ready Docker image:FROM python:3.10-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY src/ ./src/
COPY models/ ./models/
COPY config/ ./config/
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Run application
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000"]
Use multi-stage builds to reduce image size and separate build-time from runtime dependencies.
Building and Testing
# Build image
docker build -t aqi-predictor:latest .
# Run tests in container
docker run --rm aqi-predictor:latest pytest tests/
# Start services
docker-compose up -d
# View logs
docker-compose logs -f api
# Scale services
docker-compose up -d --scale api=5
# Stop services
docker-compose down
REST API Implementation
FastAPI Service
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from prometheus_client import Counter, Histogram, generate_latest
from pydantic import BaseModel, Field
import joblib
import numpy as np
import logging
from typing import List, Optional
import time
# Initialize app
app = FastAPI(
title="AQI Predictor API",
description="Production API for Air Quality Index predictions",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Metrics
prediction_counter = Counter('predictions_total', 'Total predictions made')
prediction_duration = Histogram('prediction_duration_seconds', 'Prediction latency')
error_counter = Counter('prediction_errors_total', 'Total prediction errors')
# Load model at startup
model = None
feature_names = None
@app.on_event("startup")
async def load_model():
global model, feature_names
try:
model = joblib.load('/app/models/production/model.pkl')
feature_names = joblib.load('/app/models/production/features.pkl')
logging.info("Model loaded successfully")
except Exception as e:
logging.error(f"Failed to load model: {e}")
raise
# Request/Response models
class PredictionRequest(BaseModel):
pm25: float = Field(..., ge=0, le=500, description="PM2.5 concentration")
pm10: float = Field(..., ge=0, le=600, description="PM10 concentration")
no2: float = Field(..., ge=0, le=200, description="NO2 concentration")
so2: float = Field(..., ge=0, le=100, description="SO2 concentration")
co: float = Field(..., ge=0, le=50, description="CO concentration")
o3: float = Field(..., ge=0, le=300, description="O3 concentration")
temperature: float = Field(..., ge=-50, le=60, description="Temperature (°C)")
humidity: float = Field(..., ge=0, le=100, description="Relative humidity (%)")
wind_speed: float = Field(..., ge=0, le=50, description="Wind speed (m/s)")
pressure: float = Field(..., ge=900, le=1100, description="Pressure (hPa)")
hour: int = Field(..., ge=0, le=23, description="Hour of day")
day_of_week: int = Field(..., ge=0, le=6, description="Day of week")
month: int = Field(..., ge=1, le=12, description="Month")
class Config:
json_schema_extra = {
"example": {
"pm25": 35.5,
"pm10": 50.2,
"no2": 25.3,
"so2": 8.1,
"co": 0.6,
"o3": 45.7,
"temperature": 22.5,
"humidity": 65.0,
"wind_speed": 3.2,
"pressure": 1013.25,
"hour": 14,
"day_of_week": 2,
"month": 6
}
}
class PredictionResponse(BaseModel):
aqi: float = Field(..., description="Predicted AQI value")
category: str = Field(..., description="AQI category")
confidence: float = Field(..., description="Prediction confidence score")
processing_time_ms: float = Field(..., description="Processing time")
class BatchPredictionRequest(BaseModel):
instances: List[PredictionRequest]
class BatchPredictionResponse(BaseModel):
predictions: List[PredictionResponse]
total_processing_time_ms: float
# Helper functions
def get_aqi_category(aqi: float) -> str:
"""Convert AQI value to category."""
if aqi <= 50:
return "Good"
elif aqi <= 100:
return "Moderate"
elif aqi <= 150:
return "Unhealthy for Sensitive Groups"
elif aqi <= 200:
return "Unhealthy"
elif aqi <= 300:
return "Very Unhealthy"
else:
return "Hazardous"
def calculate_confidence(prediction: float, features: np.ndarray) -> float:
"""Calculate prediction confidence based on feature values."""
# Simplified confidence calculation
# In production, use prediction intervals or model uncertainty
return 0.85 if 0 <= prediction <= 500 else 0.60
# Endpoints
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"model_loaded": model is not None,
"timestamp": time.time()
}
@app.get("/ready")
async def readiness_check():
"""Readiness check endpoint."""
if model is None:
raise HTTPException(status_code=503, detail="Model not loaded")
return {"status": "ready"}
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Single prediction endpoint."""
start_time = time.time()
try:
# Prepare features
features = np.array([[
request.pm25, request.pm10, request.no2, request.so2,
request.co, request.o3, request.temperature, request.humidity,
request.wind_speed, request.pressure, request.hour,
request.day_of_week, request.month
]])
# Make prediction
aqi = float(model.predict(features)[0])
category = get_aqi_category(aqi)
confidence = calculate_confidence(aqi, features)
# Record metrics
prediction_counter.inc()
processing_time = (time.time() - start_time) * 1000
prediction_duration.observe(processing_time / 1000)
return PredictionResponse(
aqi=round(aqi, 2),
category=category,
confidence=round(confidence, 3),
processing_time_ms=round(processing_time, 2)
)
except Exception as e:
error_counter.inc()
logging.error(f"Prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/predict/batch", response_model=BatchPredictionResponse)
async def predict_batch(request: BatchPredictionRequest):
"""Batch prediction endpoint."""
start_time = time.time()
try:
predictions = []
for instance in request.instances:
features = np.array([[
instance.pm25, instance.pm10, instance.no2, instance.so2,
instance.co, instance.o3, instance.temperature, instance.humidity,
instance.wind_speed, instance.pressure, instance.hour,
instance.day_of_week, instance.month
]])
aqi = float(model.predict(features)[0])
category = get_aqi_category(aqi)
confidence = calculate_confidence(aqi, features)
predictions.append(PredictionResponse(
aqi=round(aqi, 2),
category=category,
confidence=round(confidence, 3),
processing_time_ms=0
))
prediction_counter.inc(len(predictions))
total_time = (time.time() - start_time) * 1000
return BatchPredictionResponse(
predictions=predictions,
total_processing_time_ms=round(total_time, 2)
)
except Exception as e:
error_counter.inc()
logging.error(f"Batch prediction error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return generate_latest()
@app.get("/model/info")
async def model_info():
"""Model metadata endpoint."""
return {
"model_type": type(model).__name__,
"features": feature_names,
"n_features": len(feature_names) if feature_names else 0
}
Always implement authentication and rate limiting in production. Use API keys, OAuth, or JWT tokens.
Kubernetes Deployment
Kubernetes Manifests
apiVersion: apps/v1
kind: Deployment
metadata:
name: aqi-predictor
namespace: ml-services
labels:
app: aqi-predictor
version: v1
spec:
replicas: 3
selector:
matchLabels:
app: aqi-predictor
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
metadata:
labels:
app: aqi-predictor
version: v1
spec:
containers:
- name: api
image: your-registry/aqi-predictor:v1.0.0
imagePullPolicy: Always
ports:
- containerPort: 8000
name: http
protocol: TCP
env:
- name: MODEL_PATH
value: /models/production
- name: LOG_LEVEL
value: info
- name: WORKERS
value: "4"
resources:
requests:
cpu: 1000m
memory: 2Gi
limits:
cpu: 2000m
memory: 4Gi
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 3
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
timeoutSeconds: 3
failureThreshold: 2
volumeMounts:
- name: model-volume
mountPath: /models
readOnly: true
volumes:
- name: model-volume
persistentVolumeClaim:
claimName: model-pvc
Deployment Commands
# Create namespace
kubectl create namespace ml-services
# Apply manifests
kubectl apply -f deployment.yaml
kubectl apply -f service.yaml
kubectl apply -f ingress.yaml
kubectl apply -f hpa.yaml
# Verify deployment
kubectl get pods -n ml-services
kubectl get svc -n ml-services
kubectl get ingress -n ml-services
# View logs
kubectl logs -f deployment/aqi-predictor -n ml-services
# Rolling update
kubectl set image deployment/aqi-predictor \
api=your-registry/aqi-predictor:v1.1.0 \
-n ml-services
# Rollback
kubectl rollout undo deployment/aqi-predictor -n ml-services
# Scale manually
kubectl scale deployment/aqi-predictor --replicas=5 -n ml-services
Model Serving Platforms
- TensorFlow Serving
- Seldon Core
- BentoML
# Export model in SavedModel format
import tensorflow as tf
tf.saved_model.save(model, '/models/aqi_predictor/1')
# Run TF Serving
docker run -p 8501:8501 \
--mount type=bind,source=/models/aqi_predictor,target=/models/aqi_predictor \
-e MODEL_NAME=aqi_predictor \
-t tensorflow/serving
# Make prediction
curl -X POST http://localhost:8501/v1/models/aqi_predictor:predict \
-H 'Content-Type: application/json' \
-d '{"instances": [[35.5, 50.2, 25.3, 8.1, 0.6, 45.7, 22.5, 65.0, 3.2, 1013.25, 14, 2, 6]]}'
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: aqi-predictor
spec:
predictors:
- name: default
replicas: 3
graph:
name: classifier
type: MODEL
parameters:
- name: model_uri
value: s3://models/aqi-predictor
- name: xtype
value: ndarray
componentSpecs:
- spec:
containers:
- name: classifier
image: seldonio/mlserver:1.3.5
resources:
requests:
cpu: 1
memory: 2Gi
limits:
cpu: 2
memory: 4Gi
import bentoml
from bentoml.io import JSON, NumpyNdarray
import numpy as np
@bentoml.service(
resources={"cpu": "2", "memory": "4Gi"},
traffic={"timeout": 30}
)
class AQIPredictorService:
model = bentoml.models.get("aqi_predictor:latest")
@bentoml.api
def predict(self, input_data: NumpyNdarray) -> JSON:
predictions = self.model.predict(input_data)
return {"aqi": predictions.tolist()}
# Build and containerize
# bentoml build
# bentoml containerize aqi_predictor:latest
CI/CD Pipeline
name: Build and Deploy
on:
push:
branches:
- main
tags:
- 'v*'
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
run: pytest tests/ --cov=src --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
build:
needs: test
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v3
- name: Log in to registry
uses: docker/login-action@v2
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v4
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Build and push
uses: docker/build-push-action@v4
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
deploy:
needs: build
runs-on: ubuntu-latest
if: startsWith(github.ref, 'refs/tags/v')
steps:
- uses: actions/checkout@v3
- name: Configure kubectl
uses: azure/k8s-set-context@v3
with:
kubeconfig: ${{ secrets.KUBECONFIG }}
- name: Deploy to Kubernetes
run: |
kubectl set image deployment/aqi-predictor \
api=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.ref_name }} \
-n ml-services
kubectl rollout status deployment/aqi-predictor -n ml-services
Load Testing
import locust
from locust import HttpUser, task, between
import random
class AQIPredictorUser(HttpUser):
wait_time = between(1, 3)
def generate_sample(self):
return {
"pm25": random.uniform(0, 200),
"pm10": random.uniform(0, 300),
"no2": random.uniform(0, 100),
"so2": random.uniform(0, 50),
"co": random.uniform(0, 10),
"o3": random.uniform(0, 150),
"temperature": random.uniform(-10, 40),
"humidity": random.uniform(20, 95),
"wind_speed": random.uniform(0, 20),
"pressure": random.uniform(990, 1030),
"hour": random.randint(0, 23),
"day_of_week": random.randint(0, 6),
"month": random.randint(1, 12)
}
@task(10)
def predict_single(self):
self.client.post("/predict", json=self.generate_sample())
@task(1)
def predict_batch(self):
instances = [self.generate_sample() for _ in range(10)]
self.client.post("/predict/batch", json={"instances": instances})
@task(1)
def health_check(self):
self.client.get("/health")
# Run: locust -f load_test.py --host=http://localhost:8000
Best Practices
High Availability
High Availability
- Deploy multiple replicas across availability zones
- Implement health checks and readiness probes
- Use circuit breakers for downstream dependencies
- Configure automatic restarts and rollbacks
Performance
Performance
- Use model caching (Redis, Memcached)
- Batch predictions when possible
- Optimize model size (quantization, pruning)
- Enable GPU acceleration for large models
- Use async processing for non-critical predictions
Security
Security
- Implement authentication (API keys, OAuth)
- Use TLS/SSL for all communications
- Rate limit API requests
- Validate and sanitize all inputs
- Scan container images for vulnerabilities
- Use non-root users in containers
Cost Optimization
Cost Optimization
- Right-size compute resources
- Use horizontal pod autoscaling
- Implement request batching
- Cache frequent predictions
- Use spot instances for batch workloads