Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
Change Data Capture (CDC) enables you to track and process changes from OLTP databases in real-time. Timeplus Proton integrates seamlessly with CDC tools like Debezium to apply real-time analytics and replicate data to downstream systems.
Use Case Overview
CDC with Proton enables:
- Real-time Database Replication: Sync changes from MySQL/PostgreSQL to ClickHouse or other databases
- Event-driven Architectures: Trigger downstream processes based on data changes
- Audit Logging: Track all modifications to database records
- Data Warehouse Synchronization: Keep analytics databases up-to-date
- Microservice Data Consistency: Propagate changes across service boundaries
Architecture
A typical CDC pipeline looks like:
MySQL/PostgreSQL → Debezium → Kafka → Timeplus Proton → ClickHouse/Kafka/Applications
(Source) (CDC) (Transport) (Transform) (Destinations)
Components:
- Debezium: Captures changes from database transaction logs
- Kafka/Redpanda: Streams CDC events reliably
- Timeplus Proton: Transforms and routes CDC data with SQL
- Destination: ClickHouse, another database, or application
Tutorial: MySQL to ClickHouse Replication
This example demonstrates capturing MySQL changes and replicating them to ClickHouse via Proton.
Prerequisites
Start the CDC demo stack:
version: '3.8'
services:
mysql:
image: debezium/example-mysql:latest
environment:
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
redpanda:
image: docker.redpanda.com/redpandadata/redpanda:latest
command:
- redpanda start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
connect:
image: debezium/connect:latest
environment:
BOOTSTRAP_SERVERS: redpanda:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
proton:
image: d.timeplus.com/timeplus-io/proton:latest
clickhouse:
image: clickhouse/clickhouse-server:latest
Run: docker compose up -d
Step 1: Create ClickHouse Table
In ClickHouse, create the destination table:
CREATE TABLE customers
(
id Int32,
first_name String,
last_name String,
email String
)
ENGINE=MergeTree()
PRIMARY KEY (id);
Create a Debezium MySQL connector:
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}'
This creates a connector that captures changes from the inventory database.
Step 3: Create External Stream in Proton
Read CDC events from Kafka:
CREATE EXTERNAL STREAM customers_cdc(raw string)
SETTINGS type='kafka',
brokers='redpanda:9092',
topic='dbserver1.inventory.customers';
Examine the CDC event structure:
SELECT raw FROM customers_cdc LIMIT 1;
Debezium CDC events contain:
payload.op: Operation type (c=create, u=update, d=delete, r=read/snapshot)
payload.before: Record state before change
payload.after: Record state after change
payload.ts_ms: Change timestamp
Step 4: Create ClickHouse External Table
Connect Proton to ClickHouse:
CREATE EXTERNAL TABLE customers
SETTINGS type='clickhouse',
address='clickhouse:9000',
table='customers';
Step 5: Create Streaming ETL Pipeline
Set up a materialized view to continuously replicate data:
CREATE MATERIALIZED VIEW mv_mysql2ch INTO customers AS
SELECT
raw:payload.after.id::int32 AS id,
raw:payload.after.first_name AS first_name,
raw:payload.after.last_name AS last_name,
raw:payload.after.email AS email
FROM customers_cdc
WHERE raw:payload.op IN ('r', 'c') -- Initial snapshot + inserts
SETTINGS seek_to='earliest';
This pipeline:
- Reads CDC events from Kafka
- Extracts the new record from
payload.after
- Writes to ClickHouse continuously
Note: This is append-only replication (inserts only).
Step 6: Test the Pipeline
Insert data in MySQL:
-- In MySQL
USE inventory;
INSERT INTO customers (id, first_name, last_name, email)
VALUES (1001, 'John', 'Doe', 'john@example.com');
Query in ClickHouse:
-- In ClickHouse
SELECT * FROM customers ORDER BY id DESC LIMIT 10;
You should see the new record appear within seconds.
Advanced Pattern: Full CDC with Updates and Deletes
For complete change tracking including updates and deletes, use Proton’s changelog stream:
Step 1: Create Changelog Stream
CREATE STREAM customers(
id int,
first_name string,
last_name string,
email string
)
PRIMARY KEY id
SETTINGS mode='changelog_kv', version_column='_tp_time';
This creates a versioned key-value stream that maintains the current state.
Step 2: Handle All CDC Operations
Create materialized views for each operation type:
Read (initial snapshot):
CREATE MATERIALIZED VIEW mv_customers_r INTO customers AS
SELECT
to_time(raw:payload.ts_ms) AS _tp_time,
raw:payload.after.id::int AS id,
raw:payload.after.first_name AS first_name,
raw:payload.after.last_name AS last_name,
raw:payload.after.email AS email,
1::int8 AS _tp_delta
FROM customers_cdc
WHERE raw:payload.op='r'
SETTINGS seek_to='earliest';
Create (inserts):
CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS
SELECT
to_time(raw:payload.ts_ms) AS _tp_time,
raw:payload.after.id::int AS id,
raw:payload.after.first_name AS first_name,
raw:payload.after.last_name AS last_name,
raw:payload.after.email AS email,
1::int8 AS _tp_delta
FROM customers_cdc
WHERE raw:payload.op='c'
SETTINGS seek_to='earliest';
Update (modifications):
CREATE MATERIALIZED VIEW mv_customers_u INTO customers AS
WITH cdc_changes AS (
SELECT
ts_ms,
array_join(changes) AS change,
change.1 AS val,
change.2 AS _tp_delta
FROM (
SELECT
to_time(raw:payload.ts_ms) AS ts_ms,
raw:payload.before AS before,
raw:payload.after AS after,
[(before, -1::int8), (after, 1::int8)] AS changes
FROM customers_cdc
WHERE raw:payload.op = 'u'
SETTINGS seek_to = 'earliest'
)
)
SELECT
ts_ms AS _tp_time,
val:id::int32 AS id,
val:first_name AS first_name,
val:last_name AS last_name,
val:email AS email,
_tp_delta
FROM cdc_changes;
Delete (removals):
CREATE MATERIALIZED VIEW mv_customers_d INTO customers AS
SELECT
to_time(raw:payload.ts_ms) AS _tp_time,
raw:payload.before.id::int AS id,
raw:payload.before.first_name AS first_name,
raw:payload.before.last_name AS last_name,
raw:payload.before.email AS email,
-1::int8 AS _tp_delta
FROM customers_cdc
WHERE raw:payload.op='d'
SETTINGS seek_to='earliest';
Step 3: Query Current State
Get the current snapshot (latest version of each row):
-- Streaming query (waits for new changes)
SELECT * FROM customers;
-- Table query (returns current state immediately)
SELECT * FROM table(customers);
Test updates and deletes:
-- In MySQL
UPDATE customers SET email = 'newemail@example.com' WHERE id = 1001;
DELETE FROM customers WHERE id = 999;
Query in Proton:
-- See all historical changes
SELECT * FROM table(customers) WHERE id = 1001;
-- See only current state (excludes deleted rows)
SELECT * FROM table(customers) WHERE _tp_delta = 1;
Debezium can produce CDC events in Avro format with schema registry:
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "inventory-connector-avro",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://redpanda:8081",
"value.converter.schema.registry.url": "http://redpanda:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "table,lsn"
}
}'
Step 2: Create Avro External Stream
CREATE EXTERNAL STREAM customers_avro(
id int,
first_name string,
last_name string,
email string
)
SETTINGS type='kafka',
brokers='redpanda:9092',
topic='dbserver1.inventory.customers',
data_format='Avro',
kafka_schema_registry_url='http://redpanda:8081';
Step 3: Query Avro CDC Stream
SELECT * FROM customers_avro WHERE _tp_time > earliest_ts();
Use Case: Real-time Analytics on CDC Data
Perform analytics on database changes:
Track Most Updated Tables
SELECT
window_start,
count() AS change_count,
count_if(raw:payload.op = 'c') AS inserts,
count_if(raw:payload.op = 'u') AS updates,
count_if(raw:payload.op = 'd') AS deletes
FROM tumble(customers_cdc, 1m)
GROUP BY window_start;
Detect High-frequency Updates
SELECT
raw:payload.after.id AS record_id,
count() AS update_count
FROM customers_cdc
WHERE raw:payload.op = 'u'
GROUP BY record_id
HAVING update_count > 10;
Audit Trail
CREATE MATERIALIZED VIEW customer_audit_log AS
SELECT
to_time(raw:payload.ts_ms) AS change_time,
raw:payload.op AS operation,
raw:payload.before AS old_value,
raw:payload.after AS new_value,
raw:payload.source.table AS table_name
FROM customers_cdc;
PostgreSQL CDC
The same pattern works for PostgreSQL:
curl --request POST \
--url http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "inventory",
"topic.prefix": "pgserver1",
"plugin.name": "pgoutput"
}
}'
Use seek_to=‘earliest’ Carefully
-- Only use on initial setup
CREATE MATERIALIZED VIEW mv INTO target AS
SELECT * FROM source
SETTINGS seek_to='earliest'; -- Reads from beginning
-- For production, use latest or specific offset
SETTINGS seek_to='latest'; -- Only new data
Batch Writes to ClickHouse
-- Configure batch size
SETTINGS max_insert_block_size = 100000;
Filter Early
-- Filter in the source stream
CREATE MATERIALIZED VIEW mv INTO target AS
SELECT *
FROM cdc_stream
WHERE raw:payload.op IN ('c', 'u') -- Only inserts and updates
AND raw:payload.after.active = true; -- Business logic filter
Monitoring CDC Pipeline
Check Connector Status
curl http://localhost:8083/connectors/inventory-connector/status
Monitor Lag
SELECT
topic,
partition,
latest_offset,
consumer_offset,
latest_offset - consumer_offset AS lag
FROM system.kafka_consumers;
Track Processing Rate
SELECT
name,
rows_read,
bytes_read
FROM system.materialized_views
WHERE name LIKE 'mv_customers%';
Next Steps