Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt

Use this file to discover all available pages before exploring further.

Timeplus Proton provides native integration with Apache Kafka and compatible platforms like Redpanda, Confluent Cloud, and AWS MSK through External Streams.

CREATE EXTERNAL STREAM

Create an external stream to read from or write to Kafka:
CREATE EXTERNAL STREAM stream_name (
  column1 type1,
  column2 type2,
  ...
)
SETTINGS
  type='kafka',
  brokers='broker1:9092,broker2:9092',
  topic='topic_name';

Basic Example

CREATE EXTERNAL STREAM kafka_events (
  device string,
  temperature float,
  timestamp int64
)
SETTINGS
  type='kafka',
  brokers='localhost:9092',
  topic='sensor_data';

Authentication

SASL/PLAIN

CREATE EXTERNAL STREAM secure_stream (
  data string
)
SETTINGS
  type='kafka',
  brokers='broker:9093',
  topic='secure_topic',
  security_protocol='SASL_SSL',
  sasl_mechanism='PLAIN',
  username='your_username',
  password='your_password';

SASL/SCRAM

Supported mechanisms: SCRAM-SHA-256, SCRAM-SHA-512
CREATE EXTERNAL STREAM scram_stream (
  data string
)
SETTINGS
  type='kafka',
  brokers='broker:9093',
  topic='topic',
  security_protocol='SASL_SSL',
  sasl_mechanism='SCRAM-SHA-512',
  username='user',
  password='password';

AWS MSK IAM Authentication

CREATE EXTERNAL STREAM aws_msk_stream (
  device string,
  temperature float
)
SETTINGS
  type='kafka',
  brokers='b-1.cluster.kafka.us-west-2.amazonaws.com:9098',
  topic='my_topic',
  security_protocol='SASL_SSL',
  sasl_mechanism='AWS_MSK_IAM';

SSL/TLS Configuration

CREATE EXTERNAL STREAM ssl_stream (
  data string
)
SETTINGS
  type='kafka',
  brokers='broker:9093',
  topic='topic',
  security_protocol='SSL',
  ssl_ca_cert_file='/path/to/ca-cert.pem',
  skip_ssl_cert_check=false;
Alternatively, provide the CA certificate directly:
SETTINGS
  ...
  ssl_ca_pem='-----BEGIN CERTIFICATE-----\n...';

Configuration Options

Core Settings

SettingTypeDescriptionRequired
typeStringMust be ‘kafka’ or ‘redpanda’Yes
brokersStringComma-separated list of brokersYes
topicStringKafka topic nameYes
security_protocolStringProtocol: plaintext, SASL_PLAINTEXT, SASL_SSL, SSLNo (default: plaintext)
sasl_mechanismStringSASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAMNo
usernameStringSASL usernameNo
passwordStringSASL passwordNo
ssl_ca_cert_fileStringPath to CA certificate fileNo
ssl_ca_pemStringCA certificate string (PEM format)No
skip_ssl_cert_checkBoolSkip SSL certificate verificationNo (default: false)

Advanced Settings

SettingTypeDescriptionDefault
propertiesStringSemicolon-separated Kafka client propertiesEmpty
one_message_per_rowBoolProduce one message per row for row-based formatsfalse
data_formatStringMessage format (e.g., JSONEachRow, Avro)JSONEachRow
poll_waittime_msUInt64Poll wait time in milliseconds500
consumer_stall_timeout_msMillisecondsConsumer stall timeout60000
connection_timeout_msMillisecondsConnection timeout10000

Custom Kafka Properties

Use the properties setting to pass additional Kafka client configurations:
SETTINGS
  type='kafka',
  brokers='localhost:9092',
  topic='topic',
  properties='message.max.bytes=1048576;compression.type=snappy;max.in.flight.requests.per.connection=5';
Format: key1=value1;key2=value2;...

Reserved Virtual Columns

Proton provides virtual columns for Kafka metadata:
ColumnTypeDescription
_tp_append_timeDateTime64(3, ‘UTC’)Message append timestamp
_tp_event_timeDateTime64(3, ‘UTC’)Event timestamp
_tp_process_timeDateTime64(3, ‘UTC’)Processing timestamp
_tp_shardInt32Kafka partition ID
_tp_snInt64Kafka offset (sequence number)
_tp_message_keyStringMessage key
_tp_message_headersMap(String, String)Message headers

Using Message Keys

Define _tp_message_key as a physical column to control message keys when writing:
CREATE EXTERNAL STREAM keyed_stream (
  device string,
  value float,
  _tp_message_key string
)
SETTINGS
  type='kafka',
  brokers='localhost:9092',
  topic='keyed_topic';
Supported key types: numeric types, string, nullable variants.

Reading from Kafka

Streaming Query

-- Read new messages continuously
SELECT * FROM kafka_stream;

Historical Query

-- Read all existing messages
SELECT * FROM table(kafka_stream);

Read Specific Partitions

SELECT * FROM kafka_stream SETTINGS shards='0,2,5';

Seek to Position

-- Start from beginning
SELECT * FROM kafka_stream SETTINGS seek_to='earliest';

-- Start from latest
SELECT * FROM kafka_stream SETTINGS seek_to='latest';

-- Start from specific offset
SELECT * FROM kafka_stream SETTINGS seek_to='12345';

Writing to Kafka

INSERT Statement

INSERT INTO kafka_stream (device, temperature) VALUES ('sensor1', 25.5);

Materialized View

CREATE MATERIALIZED VIEW mv_to_kafka INTO kafka_output_stream AS
  SELECT device, avg(temperature) AS avg_temp
  FROM input_stream
  GROUP BY device;

Complete Example: Kafka to ClickHouse ETL

-- Read from Kafka
CREATE EXTERNAL STREAM kafka_input (
  device string,
  temperature float,
  timestamp int64
)
SETTINGS
  type='kafka',
  brokers='localhost:9092',
  topic='sensors';

-- Write to ClickHouse
CREATE EXTERNAL TABLE ch_output
SETTINGS
  type='clickhouse',
  address='clickhouse:9000',
  table='sensor_aggregates';

-- Transform and route data
CREATE MATERIALIZED VIEW mv_kafka_to_ch INTO ch_output AS
  SELECT
    window_start AS timestamp,
    device,
    avg(temperature) AS avg_temperature,
    max(temperature) AS max_temperature
  FROM tumble(kafka_input, 10s)
  GROUP BY window_start, device;

AWS MSK Example

Complete example with IAM authentication:
CREATE EXTERNAL STREAM aws_msk_stream (
  device string,
  temperature float
)
SETTINGS
  type='kafka',
  brokers='b-1.mycluster.kafka.us-west-2.amazonaws.com:9098',
  topic='iot_data',
  security_protocol='SASL_SSL',
  sasl_mechanism='AWS_MSK_IAM';

-- Query the stream
SELECT device, count(*), avg(temperature)
FROM aws_msk_stream
GROUP BY device;

Schema Registry Support

For Avro and Protobuf with schema registry:
CREATE EXTERNAL STREAM avro_stream (
  -- Schema will be fetched from registry
)
SETTINGS
  type='kafka',
  brokers='localhost:9092',
  topic='avro_topic',
  data_format='Avro',
  schema_subject_name='avro_topic-value';

Best Practices

  1. Use SASL_SSL for production: Always encrypt data in transit
  2. Set appropriate timeouts: Adjust consumer_stall_timeout_ms based on topic activity
  3. Enable one_message_per_row: For row-based formats when you need per-row control
  4. Partition wisely: Use _tp_message_key or sharding_expr for consistent partitioning
  5. Monitor offsets: Query _tp_sn to track processing progress

Troubleshooting

Consumer Stalls

If consumers stall on idle topics, increase the timeout:
SETTINGS consumer_stall_timeout_ms=300000  -- 5 minutes

Connection Issues

Increase connection timeout for slower networks:
SETTINGS connection_timeout_ms=30000  -- 30 seconds

Enable Debug Logging

SETTINGS log_stats=true

Build docs developers (and LLMs) love