CREATE EXTERNAL STREAM creates a connection to external data sources such as Kafka, Pulsar, ClickHouse, and others. External streams allow you to read from and write to external systems without storing data in Timeplus.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
Syntax
Common Parameters
The name of the external stream to create.
If specified, the statement will not raise an error if the stream already exists.
The type of external stream. Options:
kafka, pulsar, nats, timeplus, http, icebergThe message format for serialization/deserialization. Examples:
JSONEachRow, CSV, Avro, Protobuf, RawBLOBKafka External Stream
Required Settings
Comma-separated list of Kafka broker addresses. Example:
'localhost:9092' or 'broker1:9092,broker2:9092'The Kafka topic name to read from or write to.
Authentication Settings
The security protocol for connecting to Kafka. Options:
plaintext(default): No encryption or authenticationSASL_SSL: SASL authentication over SSL/TLSSASL_PLAINTEXT: SASL authentication without encryptionSSL: SSL/TLS encryption without SASL
SASL mechanism for authentication. Options:
PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAMUsername for SASL authentication.
Password for SASL authentication.
SSL/TLS Settings
Path to the CA certificate file for SSL/TLS verification.
CA certificate string in PEM format for verifying the server’s key.
If true, the server’s certificate won’t be verified. Default: false.
AWS MSK IAM Authentication
AWS region for MSK IAM authentication.
AWS access key ID.
AWS secret access key.
AWS session token for temporary credentials.
Use credentials from environment variables. Default: false.
Advanced Settings
Semicolon-separated key-value pairs for additional Kafka client properties.
Example:
'client.id=my-client;group.id=my-group'How long poll should wait in milliseconds. Default: 500.
If true, produces one Kafka message per row when using row-based formats like JSONEachRow. Default: false.
Expression to calculate partition ID for data distribution.
Avro/Protobuf schema subject name in the schema registry.
Time in milliseconds after which a stalled consumer is recreated. Default: 60000. Set to 0 to disable.
Timeout in milliseconds for establishing a broker connection. Default: 10000.
Kafka Examples
Basic Kafka External Stream
Kafka with SASL Authentication
AWS MSK with IAM Authentication
Kafka with Avro Schema Registry
Debezium CDC from Kafka
Pulsar External Stream
Pulsar Settings
The Pulsar service URL. Example:
'pulsar://localhost:6650' or 'pulsar+ssl://localhost:6651'The Pulsar topic name.
Accept untrusted TLS certificates from brokers. Default: false.
Validate hostname when connecting over TLS. Default: false.
CA certificate in PEM format for server verification.
Client certificate in PEM format for mTLS authentication.
Client private key in PEM format for mTLS authentication.
JSON Web Token for JWT authentication.
Maximum connections per broker. Default: 1.
Memory limit in bytes. 0 means unlimited. Default: 0.
Number of I/O threads. Default: 1.
Pulsar Example
NATS JetStream External Stream
NATS Settings
NATS server URL. Example:
'nats://localhost:4222'Subject filter for subscribing. Supports wildcards:
orders.> or orders.*JetStream stream name (must already exist).
Durable consumer name. Auto-generated if empty.
Create a durable consumer. Default: true.
Acknowledgment policy:
none, all, explicit. Default: explicit.Delivery policy:
all, last, new, by_start_sequence, by_start_time. Default: all.Messages to fetch per pull request. Default: 256.
HTTP External Stream
HTTP Settings
HTTP endpoint for both read and write.
HTTP endpoint for read operations.
HTTP endpoint for write operations.
HTTP method for reading. Default:
GET.HTTP method for writing. Default:
POST.HTTP body compression:
none, gzip, br, deflate. Default: none.Use chunked transfer encoding. Default: true.
Special Columns
External streams support special columns for metadata:_tp_time- Event timestamp_tp_message_key- Message key (for Kafka/Pulsar)_tp_partition- Partition ID_tp_offset- Message offset_tp_sn- Sequence number
See Also
- CREATE STREAM - Create internal streams
- CREATE MATERIALIZED VIEW - Process external stream data
- SELECT - Query external streams
- INSERT - Write to external streams