Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt

Use this file to discover all available pages before exploring further.

Timeplus Proton excels at building real-time ETL (Extract, Transform, Load) pipelines that can continuously process streaming data. This tutorial demonstrates how to build an end-to-end ETL pipeline that reads from Kafka/Redpanda, applies transformations, and writes to ClickHouse.

Use Case Overview

Real-time ETL pipelines are essential for:
  • Data Integration: Moving data between systems in real-time
  • Data Masking: Protecting sensitive information like IP addresses or PII
  • Format Conversion: Converting between different data formats (JSON, Avro, CSV)
  • Data Enrichment: Adding calculated fields or joining with reference data
  • Continuous Replication: Keeping downstream systems synchronized

Architecture

A typical real-time ETL pipeline with Proton consists of:
  1. Source: External stream reading from Kafka/Redpanda
  2. Transform: SQL-based transformations (filter, map, aggregate)
  3. Sink: External stream or table writing to destination (ClickHouse, Kafka, etc.)
Kafka/Redpanda → External Stream → Materialized View → External Table/Stream → ClickHouse

Tutorial: Read from Kafka, Transform, Write to ClickHouse

This example uses the Owl Shop ecommerce simulator to generate realistic frontend events and demonstrates masking sensitive data.

Prerequisites

Start the demo stack with Docker Compose:
services:
  proton:
    image: d.timeplus.com/timeplus-io/proton:latest
  redpanda:
    image: docker.redpanda.com/redpandadata/redpanda:latest
  owl-shop:
    image: quay.io/cloudhut/owl-shop:latest
  clickhouse:
    image: clickhouse/clickhouse-server:latest

Step 1: Create Source External Stream

First, create an external stream to read raw data from Kafka:
CREATE EXTERNAL STREAM frontend_events(raw string)
SETTINGS type='kafka',
         brokers='redpanda:9092',
         topic='owlshop-frontend-events';
Verify data is flowing:
SELECT * FROM frontend_events LIMIT 5;

Step 2: Prepare ClickHouse Destination

In your ClickHouse instance, create the target table:
CREATE TABLE events
(
    _tp_time DateTime64(3),
    url String,
    method String,
    ip String
)
ENGINE=MergeTree()
PRIMARY KEY (_tp_time, url);

Step 3: Create ClickHouse External Table in Proton

Connect Proton to ClickHouse using an external table:
CREATE EXTERNAL TABLE ch_events
SETTINGS type='clickhouse',
         address='clickhouse:9000',
         table='events';

Step 4: Build the ETL Pipeline with Materialized View

Create a materialized view that continuously:
  1. Reads from the source stream
  2. Extracts JSON fields
  3. Masks the IP address using MD5 hash
  4. Writes to ClickHouse
CREATE MATERIALIZED VIEW mv_frontend_etl INTO ch_events AS
    SELECT now64() AS _tp_time,
           raw:requestedUrl AS url,
           raw:method AS method,
           lower(hex(md5(raw:ipAddress))) AS ip
    FROM frontend_events;
The pipeline is now running! Data flows continuously from Kafka through Proton to ClickHouse.

Step 5: Query Results in ClickHouse

Verify data is arriving in ClickHouse:
SELECT * FROM ch_events ORDER BY _tp_time DESC LIMIT 10;
Run analytics directly in ClickHouse:
SELECT method, count() AS cnt 
FROM ch_events 
GROUP BY method;

Alternative: Kafka-to-Kafka ETL

You can also write transformed data back to Kafka:
-- Create target stream
CREATE EXTERNAL STREAM masked_events(
    _tp_time datetime64(3),
    url string,
    method string,
    ip string
)
SETTINGS type='kafka',
         brokers='redpanda:9092',
         topic='masked-fe-events',
         data_format='JSONEachRow';

-- ETL pipeline writing to Kafka
CREATE MATERIALIZED VIEW mv_mask_to_kafka INTO masked_events AS
    SELECT now64() AS _tp_time,
           raw:requestedUrl AS url,
           raw:method AS method,
           lower(hex(md5(raw:ipAddress))) AS ip
    FROM frontend_events;

Common Transformation Patterns

JSON Parsing and Extraction

-- Extract nested JSON fields
SELECT raw:user.id AS user_id,
       raw:user.email AS email,
       raw:items[1].product AS product
FROM source_stream;

Data Type Conversion

-- Convert string timestamps to datetime
SELECT parse_datetime64_best_effort(raw:timestamp) AS event_time,
       raw:amount::float64 AS amount,
       raw:count::int32 AS count
FROM source_stream;

Filtering and Enrichment

-- Filter and add calculated fields
SELECT _tp_time,
       user_id,
       amount,
       amount * 0.1 AS tax,
       if(amount > 100, 'high', 'normal') AS priority
FROM source_stream
WHERE status = 'completed';

Aggregation in ETL

-- Aggregate before writing to destination
CREATE MATERIALIZED VIEW mv_aggregated INTO target AS
    SELECT window_start AS _tp_time,
           user_id,
           count() AS event_count,
           sum(amount) AS total_amount
    FROM tumble(source_stream, 1m)
    GROUP BY window_start, user_id;

Performance Considerations

Batching

Control batch size for external table writes:
SETTINGS max_insert_block_size = 100000;

Parallel Processing

Enable parallel inserts for better throughput:
SETTINGS max_insert_threads = 8;

Monitoring

Check materialized view progress:
SELECT * FROM system.materialized_views;

Error Handling

Skip Invalid Records

CREATE MATERIALIZED VIEW mv_with_validation INTO target AS
    SELECT _tp_time,
           user_id,
           amount
    FROM source_stream
    WHERE is_valid_json(raw) AND raw:amount::float64 > 0;

Dead Letter Queue

-- Write invalid records to separate stream
CREATE MATERIALIZED VIEW mv_invalid INTO error_stream AS
    SELECT raw, _tp_time, 'invalid_json' AS error
    FROM source_stream
    WHERE NOT is_valid_json(raw);

Real-World Example: AWS MSK Integration

For production deployments with AWS MSK (Managed Streaming for Apache Kafka):
CREATE EXTERNAL STREAM msk_source(raw string)
SETTINGS type='kafka',
         brokers='b-1.msk-cluster.kafka.us-east-1.amazonaws.com:9092,b-2.msk-cluster.kafka.us-east-1.amazonaws.com:9092',
         topic='production-events',
         security_protocol='SASL_SSL',
         sasl_mechanism='AWS_MSK_IAM';

Next Steps

Build docs developers (and LLMs) love