Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt

Use this file to discover all available pages before exploring further.

Change Data Capture (CDC) enables you to track and process changes from OLTP databases in real-time. Timeplus Proton integrates seamlessly with CDC tools like Debezium to apply real-time analytics and replicate data to downstream systems.

Use Case Overview

CDC with Proton enables:
  • Real-time Database Replication: Sync changes from MySQL/PostgreSQL to ClickHouse or other databases
  • Event-driven Architectures: Trigger downstream processes based on data changes
  • Audit Logging: Track all modifications to database records
  • Data Warehouse Synchronization: Keep analytics databases up-to-date
  • Microservice Data Consistency: Propagate changes across service boundaries

Architecture

A typical CDC pipeline looks like:
MySQL/PostgreSQL → Debezium → Kafka → Timeplus Proton → ClickHouse/Kafka/Applications
     (Source)      (CDC)    (Transport)  (Transform)      (Destinations)
Components:
  • Debezium: Captures changes from database transaction logs
  • Kafka/Redpanda: Streams CDC events reliably
  • Timeplus Proton: Transforms and routes CDC data with SQL
  • Destination: ClickHouse, another database, or application

Tutorial: MySQL to ClickHouse Replication

This example demonstrates capturing MySQL changes and replicating them to ClickHouse via Proton.

Prerequisites

Start the CDC demo stack:
version: '3.8'
services:
  mysql:
    image: debezium/example-mysql:latest
    environment:
      MYSQL_ROOT_PASSWORD: debezium
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw

  redpanda:
    image: docker.redpanda.com/redpandadata/redpanda:latest
    command:
      - redpanda start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092

  connect:
    image: debezium/connect:latest
    environment:
      BOOTSTRAP_SERVERS: redpanda:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

  proton:
    image: d.timeplus.com/timeplus-io/proton:latest

  clickhouse:
    image: clickhouse/clickhouse-server:latest
Run: docker compose up -d

Step 1: Create ClickHouse Table

In ClickHouse, create the destination table:
CREATE TABLE customers
(
    id Int32,
    first_name String,
    last_name String,
    email String
)
ENGINE=MergeTree()
PRIMARY KEY (id);

Step 2: Configure Debezium Connector

Create a Debezium MySQL connector:
curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory"
  }
}'
This creates a connector that captures changes from the inventory database.

Step 3: Create External Stream in Proton

Read CDC events from Kafka:
CREATE EXTERNAL STREAM customers_cdc(raw string)
SETTINGS type='kafka',
         brokers='redpanda:9092',
         topic='dbserver1.inventory.customers';
Examine the CDC event structure:
SELECT raw FROM customers_cdc LIMIT 1;
Debezium CDC events contain:
  • payload.op: Operation type (c=create, u=update, d=delete, r=read/snapshot)
  • payload.before: Record state before change
  • payload.after: Record state after change
  • payload.ts_ms: Change timestamp

Step 4: Create ClickHouse External Table

Connect Proton to ClickHouse:
CREATE EXTERNAL TABLE customers
SETTINGS type='clickhouse',
         address='clickhouse:9000',
         table='customers';

Step 5: Create Streaming ETL Pipeline

Set up a materialized view to continuously replicate data:
CREATE MATERIALIZED VIEW mv_mysql2ch INTO customers AS
    SELECT
        raw:payload.after.id::int32 AS id,
        raw:payload.after.first_name AS first_name,
        raw:payload.after.last_name AS last_name,
        raw:payload.after.email AS email
    FROM customers_cdc
    WHERE raw:payload.op IN ('r', 'c')  -- Initial snapshot + inserts
    SETTINGS seek_to='earliest';
This pipeline:
  1. Reads CDC events from Kafka
  2. Extracts the new record from payload.after
  3. Writes to ClickHouse continuously
Note: This is append-only replication (inserts only).

Step 6: Test the Pipeline

Insert data in MySQL:
-- In MySQL
USE inventory;
INSERT INTO customers (id, first_name, last_name, email)
VALUES (1001, 'John', 'Doe', 'john@example.com');
Query in ClickHouse:
-- In ClickHouse
SELECT * FROM customers ORDER BY id DESC LIMIT 10;
You should see the new record appear within seconds.

Advanced Pattern: Full CDC with Updates and Deletes

For complete change tracking including updates and deletes, use Proton’s changelog stream:

Step 1: Create Changelog Stream

CREATE STREAM customers(
    id int,
    first_name string,
    last_name string,
    email string
)
PRIMARY KEY id
SETTINGS mode='changelog_kv', version_column='_tp_time';
This creates a versioned key-value stream that maintains the current state.

Step 2: Handle All CDC Operations

Create materialized views for each operation type: Read (initial snapshot):
CREATE MATERIALIZED VIEW mv_customers_r INTO customers AS
    SELECT
        to_time(raw:payload.ts_ms) AS _tp_time,
        raw:payload.after.id::int AS id,
        raw:payload.after.first_name AS first_name,
        raw:payload.after.last_name AS last_name,
        raw:payload.after.email AS email,
        1::int8 AS _tp_delta
    FROM customers_cdc
    WHERE raw:payload.op='r'
    SETTINGS seek_to='earliest';
Create (inserts):
CREATE MATERIALIZED VIEW mv_customers_c INTO customers AS
    SELECT
        to_time(raw:payload.ts_ms) AS _tp_time,
        raw:payload.after.id::int AS id,
        raw:payload.after.first_name AS first_name,
        raw:payload.after.last_name AS last_name,
        raw:payload.after.email AS email,
        1::int8 AS _tp_delta
    FROM customers_cdc
    WHERE raw:payload.op='c'
    SETTINGS seek_to='earliest';
Update (modifications):
CREATE MATERIALIZED VIEW mv_customers_u INTO customers AS
WITH cdc_changes AS (
    SELECT
        ts_ms,
        array_join(changes) AS change,
        change.1 AS val,
        change.2 AS _tp_delta
    FROM (
        SELECT
            to_time(raw:payload.ts_ms) AS ts_ms,
            raw:payload.before AS before,
            raw:payload.after AS after,
            [(before, -1::int8), (after, 1::int8)] AS changes
        FROM customers_cdc
        WHERE raw:payload.op = 'u'
        SETTINGS seek_to = 'earliest'
    )
)
SELECT
    ts_ms AS _tp_time,
    val:id::int32 AS id,
    val:first_name AS first_name,
    val:last_name AS last_name,
    val:email AS email,
    _tp_delta
FROM cdc_changes;
Delete (removals):
CREATE MATERIALIZED VIEW mv_customers_d INTO customers AS
    SELECT
        to_time(raw:payload.ts_ms) AS _tp_time,
        raw:payload.before.id::int AS id,
        raw:payload.before.first_name AS first_name,
        raw:payload.before.last_name AS last_name,
        raw:payload.before.email AS email,
        -1::int8 AS _tp_delta
    FROM customers_cdc
    WHERE raw:payload.op='d'
    SETTINGS seek_to='earliest';

Step 3: Query Current State

Get the current snapshot (latest version of each row):
-- Streaming query (waits for new changes)
SELECT * FROM customers;

-- Table query (returns current state immediately)
SELECT * FROM table(customers);
Test updates and deletes:
-- In MySQL
UPDATE customers SET email = 'newemail@example.com' WHERE id = 1001;
DELETE FROM customers WHERE id = 999;
Query in Proton:
-- See all historical changes
SELECT * FROM table(customers) WHERE id = 1001;

-- See only current state (excludes deleted rows)
SELECT * FROM table(customers) WHERE _tp_delta = 1;

CDC with Avro Format

Debezium can produce CDC events in Avro format with schema registry:

Step 1: Configure Debezium with Avro

curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "inventory-connector-avro",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "redpanda:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://redpanda:8081",
    "value.converter.schema.registry.url": "http://redpanda:8081",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode": "rewrite",
    "transforms.unwrap.add.fields": "table,lsn"
  }
}'

Step 2: Create Avro External Stream

CREATE EXTERNAL STREAM customers_avro(
    id int,
    first_name string,
    last_name string,
    email string
)
SETTINGS type='kafka',
         brokers='redpanda:9092',
         topic='dbserver1.inventory.customers',
         data_format='Avro',
         kafka_schema_registry_url='http://redpanda:8081';

Step 3: Query Avro CDC Stream

SELECT * FROM customers_avro WHERE _tp_time > earliest_ts();

Use Case: Real-time Analytics on CDC Data

Perform analytics on database changes:

Track Most Updated Tables

SELECT
    window_start,
    count() AS change_count,
    count_if(raw:payload.op = 'c') AS inserts,
    count_if(raw:payload.op = 'u') AS updates,
    count_if(raw:payload.op = 'd') AS deletes
FROM tumble(customers_cdc, 1m)
GROUP BY window_start;

Detect High-frequency Updates

SELECT
    raw:payload.after.id AS record_id,
    count() AS update_count
FROM customers_cdc
WHERE raw:payload.op = 'u'
GROUP BY record_id
HAVING update_count > 10;

Audit Trail

CREATE MATERIALIZED VIEW customer_audit_log AS
SELECT
    to_time(raw:payload.ts_ms) AS change_time,
    raw:payload.op AS operation,
    raw:payload.before AS old_value,
    raw:payload.after AS new_value,
    raw:payload.source.table AS table_name
FROM customers_cdc;

PostgreSQL CDC

The same pattern works for PostgreSQL:
curl --request POST \
  --url http://localhost:8083/connectors \
  --header 'Content-Type: application/json' \
  --data '{
  "name": "postgres-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "inventory",
    "topic.prefix": "pgserver1",
    "plugin.name": "pgoutput"
  }
}'

Performance Tips

Use seek_to=‘earliest’ Carefully

-- Only use on initial setup
CREATE MATERIALIZED VIEW mv INTO target AS
SELECT * FROM source
SETTINGS seek_to='earliest';  -- Reads from beginning

-- For production, use latest or specific offset
SETTINGS seek_to='latest';  -- Only new data

Batch Writes to ClickHouse

-- Configure batch size
SETTINGS max_insert_block_size = 100000;

Filter Early

-- Filter in the source stream
CREATE MATERIALIZED VIEW mv INTO target AS
SELECT *
FROM cdc_stream
WHERE raw:payload.op IN ('c', 'u')  -- Only inserts and updates
  AND raw:payload.after.active = true;  -- Business logic filter

Monitoring CDC Pipeline

Check Connector Status

curl http://localhost:8083/connectors/inventory-connector/status

Monitor Lag

SELECT
    topic,
    partition,
    latest_offset,
    consumer_offset,
    latest_offset - consumer_offset AS lag
FROM system.kafka_consumers;

Track Processing Rate

SELECT
    name,
    rows_read,
    bytes_read
FROM system.materialized_views
WHERE name LIKE 'mv_customers%';

Next Steps

Build docs developers (and LLMs) love