Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
Timeplus Proton provides native integration with Apache Kafka and compatible platforms like Redpanda, Confluent Cloud, and AWS MSK through External Streams.
CREATE EXTERNAL STREAM
Create an external stream to read from or write to Kafka:
CREATE EXTERNAL STREAM stream_name (
column1 type1,
column2 type2,
...
)
SETTINGS
type='kafka',
brokers='broker1:9092,broker2:9092',
topic='topic_name';
Basic Example
CREATE EXTERNAL STREAM kafka_events (
device string,
temperature float,
timestamp int64
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='sensor_data';
Authentication
SASL/PLAIN
CREATE EXTERNAL STREAM secure_stream (
data string
)
SETTINGS
type='kafka',
brokers='broker:9093',
topic='secure_topic',
security_protocol='SASL_SSL',
sasl_mechanism='PLAIN',
username='your_username',
password='your_password';
SASL/SCRAM
Supported mechanisms: SCRAM-SHA-256, SCRAM-SHA-512
CREATE EXTERNAL STREAM scram_stream (
data string
)
SETTINGS
type='kafka',
brokers='broker:9093',
topic='topic',
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
username='user',
password='password';
AWS MSK IAM Authentication
CREATE EXTERNAL STREAM aws_msk_stream (
device string,
temperature float
)
SETTINGS
type='kafka',
brokers='b-1.cluster.kafka.us-west-2.amazonaws.com:9098',
topic='my_topic',
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM';
SSL/TLS Configuration
CREATE EXTERNAL STREAM ssl_stream (
data string
)
SETTINGS
type='kafka',
brokers='broker:9093',
topic='topic',
security_protocol='SSL',
ssl_ca_cert_file='/path/to/ca-cert.pem',
skip_ssl_cert_check=false;
Alternatively, provide the CA certificate directly:
SETTINGS
...
ssl_ca_pem='-----BEGIN CERTIFICATE-----\n...';
Configuration Options
Core Settings
| Setting | Type | Description | Required |
|---|
type | String | Must be ‘kafka’ or ‘redpanda’ | Yes |
brokers | String | Comma-separated list of brokers | Yes |
topic | String | Kafka topic name | Yes |
security_protocol | String | Protocol: plaintext, SASL_PLAINTEXT, SASL_SSL, SSL | No (default: plaintext) |
sasl_mechanism | String | SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM | No |
username | String | SASL username | No |
password | String | SASL password | No |
ssl_ca_cert_file | String | Path to CA certificate file | No |
ssl_ca_pem | String | CA certificate string (PEM format) | No |
skip_ssl_cert_check | Bool | Skip SSL certificate verification | No (default: false) |
Advanced Settings
| Setting | Type | Description | Default |
|---|
properties | String | Semicolon-separated Kafka client properties | Empty |
one_message_per_row | Bool | Produce one message per row for row-based formats | false |
data_format | String | Message format (e.g., JSONEachRow, Avro) | JSONEachRow |
poll_waittime_ms | UInt64 | Poll wait time in milliseconds | 500 |
consumer_stall_timeout_ms | Milliseconds | Consumer stall timeout | 60000 |
connection_timeout_ms | Milliseconds | Connection timeout | 10000 |
Custom Kafka Properties
Use the properties setting to pass additional Kafka client configurations:
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='topic',
properties='message.max.bytes=1048576;compression.type=snappy;max.in.flight.requests.per.connection=5';
Format: key1=value1;key2=value2;...
Reserved Virtual Columns
Proton provides virtual columns for Kafka metadata:
| Column | Type | Description |
|---|
_tp_append_time | DateTime64(3, ‘UTC’) | Message append timestamp |
_tp_event_time | DateTime64(3, ‘UTC’) | Event timestamp |
_tp_process_time | DateTime64(3, ‘UTC’) | Processing timestamp |
_tp_shard | Int32 | Kafka partition ID |
_tp_sn | Int64 | Kafka offset (sequence number) |
_tp_message_key | String | Message key |
_tp_message_headers | Map(String, String) | Message headers |
Using Message Keys
Define _tp_message_key as a physical column to control message keys when writing:
CREATE EXTERNAL STREAM keyed_stream (
device string,
value float,
_tp_message_key string
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='keyed_topic';
Supported key types: numeric types, string, nullable variants.
Reading from Kafka
Streaming Query
-- Read new messages continuously
SELECT * FROM kafka_stream;
Historical Query
-- Read all existing messages
SELECT * FROM table(kafka_stream);
Read Specific Partitions
SELECT * FROM kafka_stream SETTINGS shards='0,2,5';
Seek to Position
-- Start from beginning
SELECT * FROM kafka_stream SETTINGS seek_to='earliest';
-- Start from latest
SELECT * FROM kafka_stream SETTINGS seek_to='latest';
-- Start from specific offset
SELECT * FROM kafka_stream SETTINGS seek_to='12345';
Writing to Kafka
INSERT Statement
INSERT INTO kafka_stream (device, temperature) VALUES ('sensor1', 25.5);
Materialized View
CREATE MATERIALIZED VIEW mv_to_kafka INTO kafka_output_stream AS
SELECT device, avg(temperature) AS avg_temp
FROM input_stream
GROUP BY device;
Complete Example: Kafka to ClickHouse ETL
-- Read from Kafka
CREATE EXTERNAL STREAM kafka_input (
device string,
temperature float,
timestamp int64
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='sensors';
-- Write to ClickHouse
CREATE EXTERNAL TABLE ch_output
SETTINGS
type='clickhouse',
address='clickhouse:9000',
table='sensor_aggregates';
-- Transform and route data
CREATE MATERIALIZED VIEW mv_kafka_to_ch INTO ch_output AS
SELECT
window_start AS timestamp,
device,
avg(temperature) AS avg_temperature,
max(temperature) AS max_temperature
FROM tumble(kafka_input, 10s)
GROUP BY window_start, device;
AWS MSK Example
Complete example with IAM authentication:
CREATE EXTERNAL STREAM aws_msk_stream (
device string,
temperature float
)
SETTINGS
type='kafka',
brokers='b-1.mycluster.kafka.us-west-2.amazonaws.com:9098',
topic='iot_data',
security_protocol='SASL_SSL',
sasl_mechanism='AWS_MSK_IAM';
-- Query the stream
SELECT device, count(*), avg(temperature)
FROM aws_msk_stream
GROUP BY device;
Schema Registry Support
For Avro and Protobuf with schema registry:
CREATE EXTERNAL STREAM avro_stream (
-- Schema will be fetched from registry
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='avro_topic',
data_format='Avro',
schema_subject_name='avro_topic-value';
Best Practices
- Use SASL_SSL for production: Always encrypt data in transit
- Set appropriate timeouts: Adjust
consumer_stall_timeout_ms based on topic activity
- Enable one_message_per_row: For row-based formats when you need per-row control
- Partition wisely: Use
_tp_message_key or sharding_expr for consistent partitioning
- Monitor offsets: Query
_tp_sn to track processing progress
Troubleshooting
Consumer Stalls
If consumers stall on idle topics, increase the timeout:
SETTINGS consumer_stall_timeout_ms=300000 -- 5 minutes
Connection Issues
Increase connection timeout for slower networks:
SETTINGS connection_timeout_ms=30000 -- 30 seconds
Enable Debug Logging