Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
Timeplus Proton excels at building real-time ETL (Extract, Transform, Load) pipelines that can continuously process streaming data. This tutorial demonstrates how to build an end-to-end ETL pipeline that reads from Kafka/Redpanda, applies transformations, and writes to ClickHouse.
Use Case Overview
Real-time ETL pipelines are essential for:
- Data Integration: Moving data between systems in real-time
- Data Masking: Protecting sensitive information like IP addresses or PII
- Format Conversion: Converting between different data formats (JSON, Avro, CSV)
- Data Enrichment: Adding calculated fields or joining with reference data
- Continuous Replication: Keeping downstream systems synchronized
Architecture
A typical real-time ETL pipeline with Proton consists of:
- Source: External stream reading from Kafka/Redpanda
- Transform: SQL-based transformations (filter, map, aggregate)
- Sink: External stream or table writing to destination (ClickHouse, Kafka, etc.)
Kafka/Redpanda → External Stream → Materialized View → External Table/Stream → ClickHouse
This example uses the Owl Shop ecommerce simulator to generate realistic frontend events and demonstrates masking sensitive data.
Prerequisites
Start the demo stack with Docker Compose:
services:
proton:
image: d.timeplus.com/timeplus-io/proton:latest
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:latest
owl-shop:
image: quay.io/cloudhut/owl-shop:latest
clickhouse:
image: clickhouse/clickhouse-server:latest
Step 1: Create Source External Stream
First, create an external stream to read raw data from Kafka:
CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS type='kafka',
brokers='redpanda:9092',
topic='owlshop-frontend-events';
Verify data is flowing:
SELECT * FROM frontend_events LIMIT 5;
Step 2: Prepare ClickHouse Destination
In your ClickHouse instance, create the target table:
CREATE TABLE events
(
_tp_time DateTime64(3),
url String,
method String,
ip String
)
ENGINE=MergeTree()
PRIMARY KEY (_tp_time, url);
Step 3: Create ClickHouse External Table in Proton
Connect Proton to ClickHouse using an external table:
CREATE EXTERNAL TABLE ch_events
SETTINGS type='clickhouse',
address='clickhouse:9000',
table='events';
Step 4: Build the ETL Pipeline with Materialized View
Create a materialized view that continuously:
- Reads from the source stream
- Extracts JSON fields
- Masks the IP address using MD5 hash
- Writes to ClickHouse
CREATE MATERIALIZED VIEW mv_frontend_etl INTO ch_events AS
SELECT now64() AS _tp_time,
raw:requestedUrl AS url,
raw:method AS method,
lower(hex(md5(raw:ipAddress))) AS ip
FROM frontend_events;
The pipeline is now running! Data flows continuously from Kafka through Proton to ClickHouse.
Step 5: Query Results in ClickHouse
Verify data is arriving in ClickHouse:
SELECT * FROM ch_events ORDER BY _tp_time DESC LIMIT 10;
Run analytics directly in ClickHouse:
SELECT method, count() AS cnt
FROM ch_events
GROUP BY method;
Alternative: Kafka-to-Kafka ETL
You can also write transformed data back to Kafka:
-- Create target stream
CREATE EXTERNAL STREAM masked_events(
_tp_time datetime64(3),
url string,
method string,
ip string
)
SETTINGS type='kafka',
brokers='redpanda:9092',
topic='masked-fe-events',
data_format='JSONEachRow';
-- ETL pipeline writing to Kafka
CREATE MATERIALIZED VIEW mv_mask_to_kafka INTO masked_events AS
SELECT now64() AS _tp_time,
raw:requestedUrl AS url,
raw:method AS method,
lower(hex(md5(raw:ipAddress))) AS ip
FROM frontend_events;
JSON Parsing and Extraction
-- Extract nested JSON fields
SELECT raw:user.id AS user_id,
raw:user.email AS email,
raw:items[1].product AS product
FROM source_stream;
Data Type Conversion
-- Convert string timestamps to datetime
SELECT parse_datetime64_best_effort(raw:timestamp) AS event_time,
raw:amount::float64 AS amount,
raw:count::int32 AS count
FROM source_stream;
Filtering and Enrichment
-- Filter and add calculated fields
SELECT _tp_time,
user_id,
amount,
amount * 0.1 AS tax,
if(amount > 100, 'high', 'normal') AS priority
FROM source_stream
WHERE status = 'completed';
Aggregation in ETL
-- Aggregate before writing to destination
CREATE MATERIALIZED VIEW mv_aggregated INTO target AS
SELECT window_start AS _tp_time,
user_id,
count() AS event_count,
sum(amount) AS total_amount
FROM tumble(source_stream, 1m)
GROUP BY window_start, user_id;
Batching
Control batch size for external table writes:
SETTINGS max_insert_block_size = 100000;
Parallel Processing
Enable parallel inserts for better throughput:
SETTINGS max_insert_threads = 8;
Monitoring
Check materialized view progress:
SELECT * FROM system.materialized_views;
Error Handling
Skip Invalid Records
CREATE MATERIALIZED VIEW mv_with_validation INTO target AS
SELECT _tp_time,
user_id,
amount
FROM source_stream
WHERE is_valid_json(raw) AND raw:amount::float64 > 0;
Dead Letter Queue
-- Write invalid records to separate stream
CREATE MATERIALIZED VIEW mv_invalid INTO error_stream AS
SELECT raw, _tp_time, 'invalid_json' AS error
FROM source_stream
WHERE NOT is_valid_json(raw);
Real-World Example: AWS MSK Integration
For production deployments with AWS MSK (Managed Streaming for Apache Kafka):
CREATE EXTERNAL STREAM msk_source(raw string)
SETTINGS type='kafka',
brokers='b-1.msk-cluster.kafka.us-east-1.amazonaws.com:9092,b-2.msk-cluster.kafka.us-east-1.amazonaws.com:9092',
topic='production-events',
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM';
Next Steps