Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
Feature pipelines are critical for real-time machine learning systems. Timeplus Proton serves as a high-performance real-time feature store, enabling you to compute and serve features with millisecond latency.
Use Case Overview
Real-time feature pipelines with Proton enable:
- ML Feature Computation: Calculate features in real-time as events arrive
- Low-latency Serving: Serve features to ML models with sub-second latency
- Windowed Aggregations: Compute time-based features (last 5 minutes, last hour, etc.)
- Historical Backfill: Support both real-time and batch feature computation
- Feature Store: Maintain current and historical feature values
Architecture
A typical real-time feature pipeline consists of:
Event Stream → Feature Computation → Feature Store → ML Model Serving
↓ (Proton) (Proton) ↓
Raw Events Materialized Views Versioned State Predictions
Tutorial: Real-time Fraud Detection
This example demonstrates building a real-time fraud detection feature pipeline using Proton.
Prerequisites
Start the fraud detection demo:
services:
proton:
image: d.timeplus.com/timeplus-io/proton:latest
fraud-generator:
image: timeplus/fraud:latest
jupyter:
image: jupyter/scipy-notebook:latest
ports:
- "8888:8888"
volumes:
- ./notebooks:/home/jovyan/work
Step 1: Create Transaction Stream
Capture incoming payment transactions:
CREATE EXTERNAL STREAM transactions (
transaction_id string,
user_id string,
merchant_id string,
amount float64,
currency string,
country string,
timestamp datetime64(3),
device_type string,
ip_address string
)
SETTINGS type='kafka',
brokers='kafka:9092',
topic='payment-transactions';
Step 2: Compute Real-time Features
Create materialized views to compute features as transactions arrive:
Feature 1: Transaction Velocity (count in last 5 minutes)
CREATE MATERIALIZED VIEW feature_tx_velocity AS
SELECT
user_id,
window_end AS feature_timestamp,
count() AS tx_count_5m,
sum(amount) AS tx_amount_5m,
max(amount) AS max_amount_5m,
count(DISTINCT merchant_id) AS unique_merchants_5m
FROM hop(transactions, 30s, 5m) -- 5 min window, update every 30s
GROUP BY user_id, window_end;
Feature 2: Geographic Velocity (transactions from different countries)
CREATE MATERIALIZED VIEW feature_geo_velocity AS
SELECT
user_id,
window_end AS feature_timestamp,
count(DISTINCT country) AS unique_countries_1h,
count(DISTINCT ip_address) AS unique_ips_1h
FROM hop(transactions, 5m, 1h)
GROUP BY user_id, window_end;
Feature 3: Amount Anomalies (compared to user’s average)
CREATE MATERIALIZED VIEW feature_amount_stats AS
SELECT
user_id,
window_end AS feature_timestamp,
avg(amount) AS avg_amount_24h,
stddev_pop(amount) AS stddev_amount_24h,
quantile(0.95)(amount) AS p95_amount_24h
FROM hop(transactions, 1h, 24h)
GROUP BY user_id, window_end;
Feature 4: Merchant Risk Score
CREATE MATERIALIZED VIEW feature_merchant_risk AS
SELECT
merchant_id,
window_end AS feature_timestamp,
count() AS merchant_tx_count_1h,
count(DISTINCT user_id) AS unique_users_1h,
avg(amount) AS merchant_avg_amount_1h
FROM hop(transactions, 5m, 1h)
GROUP BY merchant_id, window_end;
Step 3: Create Feature Store Table
Maintain current feature values for each user:
CREATE TABLE feature_store (
user_id string,
feature_timestamp datetime64(3),
-- Velocity features
tx_count_5m uint32,
tx_amount_5m float64,
max_amount_5m float64,
unique_merchants_5m uint32,
-- Geographic features
unique_countries_1h uint32,
unique_ips_1h uint32,
-- Amount statistics
avg_amount_24h float64,
stddev_amount_24h float64,
p95_amount_24h float64
)
PRIMARY KEY user_id
SETTINGS mode='versioned_kv', version_column='feature_timestamp';
Populate the feature store by joining computed features:
CREATE MATERIALIZED VIEW populate_feature_store INTO feature_store AS
SELECT
vel.user_id,
vel.feature_timestamp,
vel.tx_count_5m,
vel.tx_amount_5m,
vel.max_amount_5m,
vel.unique_merchants_5m,
geo.unique_countries_1h,
geo.unique_ips_1h,
stats.avg_amount_24h,
stats.stddev_amount_24h,
stats.p95_amount_24h
FROM feature_tx_velocity AS vel
LEFT JOIN feature_geo_velocity AS geo ON vel.user_id = geo.user_id AND vel.feature_timestamp = geo.feature_timestamp
LEFT JOIN feature_amount_stats AS stats ON vel.user_id = stats.user_id AND vel.feature_timestamp = stats.feature_timestamp;
Step 4: Serve Features to ML Models
Query features for a specific user in real-time:
-- Get latest features for user
SELECT *
FROM feature_store
WHERE user_id = 'user_12345'
ORDER BY feature_timestamp DESC
LIMIT 1;
Or use the Python driver for model serving:
from proton_driver import client
proton = client.Client(host='localhost', port=8463)
def get_features(user_id):
query = f"""
SELECT
tx_count_5m,
tx_amount_5m,
unique_countries_1h,
avg_amount_24h,
stddev_amount_24h
FROM feature_store
WHERE user_id = '{user_id}'
ORDER BY feature_timestamp DESC
LIMIT 1
"""
result = proton.query(query)
return result.first_row if result.row_count > 0 else None
# Use in ML inference
features = get_features('user_12345')
if features:
prediction = fraud_model.predict([features])
print(f"Fraud probability: {prediction}")
Step 5: Create Fraud Detection Logic
Combine features to detect potential fraud:
CREATE MATERIALIZED VIEW fraud_alerts AS
SELECT
t.transaction_id,
t.user_id,
t.amount,
t.timestamp,
f.tx_count_5m,
f.unique_countries_1h,
f.avg_amount_24h,
f.stddev_amount_24h,
-- Flag suspicious patterns
CASE
WHEN f.tx_count_5m > 10 THEN 'high_velocity'
WHEN f.unique_countries_1h > 3 THEN 'geo_anomaly'
WHEN t.amount > f.avg_amount_24h + 3 * f.stddev_amount_24h THEN 'amount_anomaly'
WHEN t.amount > 5 * f.p95_amount_24h THEN 'extreme_amount'
ELSE 'normal'
END AS risk_flag
FROM transactions AS t
JOIN feature_store AS f ON t.user_id = f.user_id
WHERE risk_flag != 'normal';
Advanced Feature Patterns
Session-based Features
Compute features within user sessions:
CREATE MATERIALIZED VIEW feature_session AS
SELECT
user_id,
session_start,
session_end,
count() AS events_per_session,
max(timestamp) - min(timestamp) AS session_duration_ms,
count(DISTINCT page_url) AS unique_pages_visited
FROM session(clickstream, user_id, 30m) -- 30 min session timeout
GROUP BY user_id, session_start, session_end;
Ratio and Percentage Features
CREATE MATERIALIZED VIEW feature_ratios AS
SELECT
user_id,
window_end AS feature_timestamp,
count_if(action = 'purchase') AS purchases,
count_if(action = 'view') AS views,
purchases * 1.0 / nullif(views, 0) AS purchase_rate,
count_if(declined = true) * 100.0 / count() AS decline_rate_pct
FROM hop(user_events, 1h, 24h)
GROUP BY user_id, window_end;
Lag Features (change from previous window)
CREATE MATERIALIZED VIEW feature_trends AS
SELECT
user_id,
window_end,
tx_amount,
lag(tx_amount, 1) OVER (PARTITION BY user_id ORDER BY window_end) AS prev_amount,
tx_amount - prev_amount AS amount_change,
(tx_amount - prev_amount) * 100.0 / nullif(prev_amount, 0) AS amount_change_pct
FROM (
SELECT user_id, window_end, sum(amount) AS tx_amount
FROM tumble(transactions, 1h)
GROUP BY user_id, window_end
);
Historical Backfill Support
Proton supports both real-time and historical feature computation:
Batch Feature Computation
-- Compute features from historical data
INSERT INTO feature_store
SELECT
user_id,
timestamp AS feature_timestamp,
count() OVER (PARTITION BY user_id ORDER BY timestamp RANGE BETWEEN INTERVAL 5 MINUTE PRECEDING AND CURRENT ROW) AS tx_count_5m,
sum(amount) OVER (PARTITION BY user_id ORDER BY timestamp RANGE BETWEEN INTERVAL 5 MINUTE PRECEDING AND CURRENT ROW) AS tx_amount_5m
FROM historical_transactions;
Point-in-time Correctness
Ensure features reflect what was known at prediction time:
CREATE TABLE feature_store_versioned (
user_id string,
feature_timestamp datetime64(3),
computation_timestamp datetime64(3) DEFAULT now64(),
features map(string, float64)
)
PRIMARY KEY (user_id, feature_timestamp);
Integration with ML Frameworks
Python + scikit-learn
import pandas as pd
from proton_driver import client
from sklearn.ensemble import RandomForestClassifier
proton = client.Client()
# Fetch training data
query = """
SELECT
tx_count_5m,
tx_amount_5m,
unique_countries_1h,
avg_amount_24h,
is_fraud
FROM feature_store_labeled
WHERE feature_timestamp > now64() - INTERVAL 30 DAY
"""
df = proton.query_dataframe(query)
X = df.drop('is_fraud', axis=1)
y = df['is_fraud']
model = RandomForestClassifier()
model.fit(X, y)
Real-time Inference
def predict_fraud(transaction_id):
# Fetch real-time features
query = f"""
SELECT f.*
FROM transactions t
JOIN feature_store f ON t.user_id = f.user_id
WHERE t.transaction_id = '{transaction_id}'
"""
features = proton.query(query).first_row
# Predict
X = [[features['tx_count_5m'], features['tx_amount_5m'], ...]]
probability = model.predict_proba(X)[0][1]
return probability > 0.8 # Fraud threshold
Use Appropriate Window Sizes
-- Efficient: Aligned windows
FROM tumble(stream, 1m) -- Non-overlapping
-- Less efficient: Small slide interval
FROM hop(stream, 1s, 5m) -- Creates many windows
-- Balance: Reasonable slide interval
FROM hop(stream, 30s, 5m) -- Good balance
Materialize Common Joins
-- Instead of joining in every query
CREATE MATERIALIZED VIEW feature_combined AS
SELECT
t.user_id,
t.timestamp,
f1.feature_a,
f2.feature_b,
f3.feature_c
FROM transactions t
JOIN feature_view_1 f1 USING (user_id)
JOIN feature_view_2 f2 USING (user_id)
JOIN feature_view_3 f3 USING (user_id);
Use versioned_kv for Latest Values
CREATE TABLE latest_features (
user_id string,
updated_at datetime64(3),
features map(string, float64)
)
PRIMARY KEY user_id
SETTINGS mode='versioned_kv', version_column='updated_at';
Query the latest version without ORDER BY + LIMIT:
SELECT * FROM table(latest_features) WHERE user_id = 'user_123';
Monitoring Feature Quality
Track Feature Freshness
SELECT
user_id,
feature_timestamp,
now64() - feature_timestamp AS feature_age_ms
FROM feature_store
WHERE feature_age_ms > 60000 -- Alert if features >1 min old
ORDER BY feature_age_ms DESC;
Feature Distribution Monitoring
SELECT
quantile(0.01)(tx_count_5m) AS p01,
quantile(0.50)(tx_count_5m) AS p50,
quantile(0.99)(tx_count_5m) AS p99,
avg(tx_count_5m) AS mean,
stddev_pop(tx_count_5m) AS stddev
FROM feature_store
WHERE feature_timestamp > now64() - INTERVAL 1 HOUR;
Next Steps