Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt

Use this file to discover all available pages before exploring further.

Feature pipelines are critical for real-time machine learning systems. Timeplus Proton serves as a high-performance real-time feature store, enabling you to compute and serve features with millisecond latency.

Use Case Overview

Real-time feature pipelines with Proton enable:
  • ML Feature Computation: Calculate features in real-time as events arrive
  • Low-latency Serving: Serve features to ML models with sub-second latency
  • Windowed Aggregations: Compute time-based features (last 5 minutes, last hour, etc.)
  • Historical Backfill: Support both real-time and batch feature computation
  • Feature Store: Maintain current and historical feature values

Architecture

A typical real-time feature pipeline consists of:
Event Stream → Feature Computation → Feature Store → ML Model Serving
     ↓              (Proton)           (Proton)           ↓
  Raw Events    Materialized Views   Versioned State   Predictions

Tutorial: Real-time Fraud Detection

This example demonstrates building a real-time fraud detection feature pipeline using Proton.

Prerequisites

Start the fraud detection demo:
services:
  proton:
    image: d.timeplus.com/timeplus-io/proton:latest
  fraud-generator:
    image: timeplus/fraud:latest
  jupyter:
    image: jupyter/scipy-notebook:latest
    ports:
      - "8888:8888"
    volumes:
      - ./notebooks:/home/jovyan/work

Step 1: Create Transaction Stream

Capture incoming payment transactions:
CREATE EXTERNAL STREAM transactions (
    transaction_id string,
    user_id string,
    merchant_id string,
    amount float64,
    currency string,
    country string,
    timestamp datetime64(3),
    device_type string,
    ip_address string
)
SETTINGS type='kafka',
         brokers='kafka:9092',
         topic='payment-transactions';

Step 2: Compute Real-time Features

Create materialized views to compute features as transactions arrive: Feature 1: Transaction Velocity (count in last 5 minutes)
CREATE MATERIALIZED VIEW feature_tx_velocity AS
SELECT
    user_id,
    window_end AS feature_timestamp,
    count() AS tx_count_5m,
    sum(amount) AS tx_amount_5m,
    max(amount) AS max_amount_5m,
    count(DISTINCT merchant_id) AS unique_merchants_5m
FROM hop(transactions, 30s, 5m)  -- 5 min window, update every 30s
GROUP BY user_id, window_end;
Feature 2: Geographic Velocity (transactions from different countries)
CREATE MATERIALIZED VIEW feature_geo_velocity AS
SELECT
    user_id,
    window_end AS feature_timestamp,
    count(DISTINCT country) AS unique_countries_1h,
    count(DISTINCT ip_address) AS unique_ips_1h
FROM hop(transactions, 5m, 1h)
GROUP BY user_id, window_end;
Feature 3: Amount Anomalies (compared to user’s average)
CREATE MATERIALIZED VIEW feature_amount_stats AS
SELECT
    user_id,
    window_end AS feature_timestamp,
    avg(amount) AS avg_amount_24h,
    stddev_pop(amount) AS stddev_amount_24h,
    quantile(0.95)(amount) AS p95_amount_24h
FROM hop(transactions, 1h, 24h)
GROUP BY user_id, window_end;
Feature 4: Merchant Risk Score
CREATE MATERIALIZED VIEW feature_merchant_risk AS
SELECT
    merchant_id,
    window_end AS feature_timestamp,
    count() AS merchant_tx_count_1h,
    count(DISTINCT user_id) AS unique_users_1h,
    avg(amount) AS merchant_avg_amount_1h
FROM hop(transactions, 5m, 1h)
GROUP BY merchant_id, window_end;

Step 3: Create Feature Store Table

Maintain current feature values for each user:
CREATE TABLE feature_store (
    user_id string,
    feature_timestamp datetime64(3),
    -- Velocity features
    tx_count_5m uint32,
    tx_amount_5m float64,
    max_amount_5m float64,
    unique_merchants_5m uint32,
    -- Geographic features
    unique_countries_1h uint32,
    unique_ips_1h uint32,
    -- Amount statistics
    avg_amount_24h float64,
    stddev_amount_24h float64,
    p95_amount_24h float64
)
PRIMARY KEY user_id
SETTINGS mode='versioned_kv', version_column='feature_timestamp';
Populate the feature store by joining computed features:
CREATE MATERIALIZED VIEW populate_feature_store INTO feature_store AS
SELECT
    vel.user_id,
    vel.feature_timestamp,
    vel.tx_count_5m,
    vel.tx_amount_5m,
    vel.max_amount_5m,
    vel.unique_merchants_5m,
    geo.unique_countries_1h,
    geo.unique_ips_1h,
    stats.avg_amount_24h,
    stats.stddev_amount_24h,
    stats.p95_amount_24h
FROM feature_tx_velocity AS vel
LEFT JOIN feature_geo_velocity AS geo ON vel.user_id = geo.user_id AND vel.feature_timestamp = geo.feature_timestamp
LEFT JOIN feature_amount_stats AS stats ON vel.user_id = stats.user_id AND vel.feature_timestamp = stats.feature_timestamp;

Step 4: Serve Features to ML Models

Query features for a specific user in real-time:
-- Get latest features for user
SELECT *
FROM feature_store
WHERE user_id = 'user_12345'
ORDER BY feature_timestamp DESC
LIMIT 1;
Or use the Python driver for model serving:
from proton_driver import client

proton = client.Client(host='localhost', port=8463)

def get_features(user_id):
    query = f"""
    SELECT
        tx_count_5m,
        tx_amount_5m,
        unique_countries_1h,
        avg_amount_24h,
        stddev_amount_24h
    FROM feature_store
    WHERE user_id = '{user_id}'
    ORDER BY feature_timestamp DESC
    LIMIT 1
    """
    result = proton.query(query)
    return result.first_row if result.row_count > 0 else None

# Use in ML inference
features = get_features('user_12345')
if features:
    prediction = fraud_model.predict([features])
    print(f"Fraud probability: {prediction}")

Step 5: Create Fraud Detection Logic

Combine features to detect potential fraud:
CREATE MATERIALIZED VIEW fraud_alerts AS
SELECT
    t.transaction_id,
    t.user_id,
    t.amount,
    t.timestamp,
    f.tx_count_5m,
    f.unique_countries_1h,
    f.avg_amount_24h,
    f.stddev_amount_24h,
    -- Flag suspicious patterns
    CASE
        WHEN f.tx_count_5m > 10 THEN 'high_velocity'
        WHEN f.unique_countries_1h > 3 THEN 'geo_anomaly'
        WHEN t.amount > f.avg_amount_24h + 3 * f.stddev_amount_24h THEN 'amount_anomaly'
        WHEN t.amount > 5 * f.p95_amount_24h THEN 'extreme_amount'
        ELSE 'normal'
    END AS risk_flag
FROM transactions AS t
JOIN feature_store AS f ON t.user_id = f.user_id
WHERE risk_flag != 'normal';

Advanced Feature Patterns

Session-based Features

Compute features within user sessions:
CREATE MATERIALIZED VIEW feature_session AS
SELECT
    user_id,
    session_start,
    session_end,
    count() AS events_per_session,
    max(timestamp) - min(timestamp) AS session_duration_ms,
    count(DISTINCT page_url) AS unique_pages_visited
FROM session(clickstream, user_id, 30m)  -- 30 min session timeout
GROUP BY user_id, session_start, session_end;

Ratio and Percentage Features

CREATE MATERIALIZED VIEW feature_ratios AS
SELECT
    user_id,
    window_end AS feature_timestamp,
    count_if(action = 'purchase') AS purchases,
    count_if(action = 'view') AS views,
    purchases * 1.0 / nullif(views, 0) AS purchase_rate,
    count_if(declined = true) * 100.0 / count() AS decline_rate_pct
FROM hop(user_events, 1h, 24h)
GROUP BY user_id, window_end;

Lag Features (change from previous window)

CREATE MATERIALIZED VIEW feature_trends AS
SELECT
    user_id,
    window_end,
    tx_amount,
    lag(tx_amount, 1) OVER (PARTITION BY user_id ORDER BY window_end) AS prev_amount,
    tx_amount - prev_amount AS amount_change,
    (tx_amount - prev_amount) * 100.0 / nullif(prev_amount, 0) AS amount_change_pct
FROM (
    SELECT user_id, window_end, sum(amount) AS tx_amount
    FROM tumble(transactions, 1h)
    GROUP BY user_id, window_end
);

Historical Backfill Support

Proton supports both real-time and historical feature computation:

Batch Feature Computation

-- Compute features from historical data
INSERT INTO feature_store
SELECT
    user_id,
    timestamp AS feature_timestamp,
    count() OVER (PARTITION BY user_id ORDER BY timestamp RANGE BETWEEN INTERVAL 5 MINUTE PRECEDING AND CURRENT ROW) AS tx_count_5m,
    sum(amount) OVER (PARTITION BY user_id ORDER BY timestamp RANGE BETWEEN INTERVAL 5 MINUTE PRECEDING AND CURRENT ROW) AS tx_amount_5m
FROM historical_transactions;

Point-in-time Correctness

Ensure features reflect what was known at prediction time:
CREATE TABLE feature_store_versioned (
    user_id string,
    feature_timestamp datetime64(3),
    computation_timestamp datetime64(3) DEFAULT now64(),
    features map(string, float64)
)
PRIMARY KEY (user_id, feature_timestamp);

Integration with ML Frameworks

Python + scikit-learn

import pandas as pd
from proton_driver import client
from sklearn.ensemble import RandomForestClassifier

proton = client.Client()

# Fetch training data
query = """
SELECT
    tx_count_5m,
    tx_amount_5m,
    unique_countries_1h,
    avg_amount_24h,
    is_fraud
FROM feature_store_labeled
WHERE feature_timestamp > now64() - INTERVAL 30 DAY
"""

df = proton.query_dataframe(query)
X = df.drop('is_fraud', axis=1)
y = df['is_fraud']

model = RandomForestClassifier()
model.fit(X, y)

Real-time Inference

def predict_fraud(transaction_id):
    # Fetch real-time features
    query = f"""
    SELECT f.*
    FROM transactions t
    JOIN feature_store f ON t.user_id = f.user_id
    WHERE t.transaction_id = '{transaction_id}'
    """
    features = proton.query(query).first_row
    
    # Predict
    X = [[features['tx_count_5m'], features['tx_amount_5m'], ...]]
    probability = model.predict_proba(X)[0][1]
    
    return probability > 0.8  # Fraud threshold

Performance Optimization

Use Appropriate Window Sizes

-- Efficient: Aligned windows
FROM tumble(stream, 1m)  -- Non-overlapping

-- Less efficient: Small slide interval
FROM hop(stream, 1s, 5m)  -- Creates many windows

-- Balance: Reasonable slide interval
FROM hop(stream, 30s, 5m)  -- Good balance

Materialize Common Joins

-- Instead of joining in every query
CREATE MATERIALIZED VIEW feature_combined AS
SELECT
    t.user_id,
    t.timestamp,
    f1.feature_a,
    f2.feature_b,
    f3.feature_c
FROM transactions t
JOIN feature_view_1 f1 USING (user_id)
JOIN feature_view_2 f2 USING (user_id)
JOIN feature_view_3 f3 USING (user_id);

Use versioned_kv for Latest Values

CREATE TABLE latest_features (
    user_id string,
    updated_at datetime64(3),
    features map(string, float64)
)
PRIMARY KEY user_id
SETTINGS mode='versioned_kv', version_column='updated_at';
Query the latest version without ORDER BY + LIMIT:
SELECT * FROM table(latest_features) WHERE user_id = 'user_123';

Monitoring Feature Quality

Track Feature Freshness

SELECT
    user_id,
    feature_timestamp,
    now64() - feature_timestamp AS feature_age_ms
FROM feature_store
WHERE feature_age_ms > 60000  -- Alert if features >1 min old
ORDER BY feature_age_ms DESC;

Feature Distribution Monitoring

SELECT
    quantile(0.01)(tx_count_5m) AS p01,
    quantile(0.50)(tx_count_5m) AS p50,
    quantile(0.99)(tx_count_5m) AS p99,
    avg(tx_count_5m) AS mean,
    stddev_pop(tx_count_5m) AS stddev
FROM feature_store
WHERE feature_timestamp > now64() - INTERVAL 1 HOUR;

Next Steps

Build docs developers (and LLMs) love