The Kafka connector lets you read from and write to Apache Kafka topics using Flink SQL and the Table API. It is maintained in a separate repository: Repository: apache/flink-connector-kafkaDocumentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt
Use this file to discover all available pages before exploring further.
The Kafka connector is externalized from the main Flink repository. Add the
flink-sql-connector-kafka dependency to your project to use it.Dependency
Source table
The Kafka connector creates an unbounded scan source that continuously reads records from one or more Kafka topics.Scan startup modes
Thescan.startup.mode option controls where the Kafka consumer begins reading:
| Mode | Description |
|---|---|
earliest-offset | Start from the earliest available offset in each partition. |
latest-offset | Start from the latest offset, reading only new records. |
group-offsets | Resume from the committed offset of the consumer group. Falls back to latest-offset if no committed offset exists. |
timestamp | Start from the first offset whose timestamp is greater than or equal to scan.startup.timestamp-millis. |
specific-offsets | Start from a per-partition offset specified in scan.startup.specific-offsets. |
Metadata columns
The Kafka connector exposes the following metadata fields:| Key | Type | Description |
|---|---|---|
topic | STRING NOT NULL | Name of the topic the record belongs to. |
partition | INT NOT NULL | Partition ID. |
headers | MAP<STRING, BYTES> | Kafka record headers. |
leader-epoch | INT | Leader epoch of the partition. |
offset | BIGINT NOT NULL | Offset within the partition. |
timestamp | TIMESTAMP_LTZ(3) NOT NULL | Kafka record timestamp. |
timestamp-type | STRING NOT NULL | Timestamp type: NoTimestampType, CreateTime, or LogAppendTime. |
Sink table
The Kafka connector creates a streaming sink that appends records to a Kafka topic.Using Avro format
Key and value formats
To configure a separate key format (for example, for upsert semantics), use thekey.format and value.format options:
Key connector options
| Option | Required | Default | Description |
|---|---|---|---|
connector | Yes | — | Must be 'kafka'. |
topic | Yes (source) | — | Topic name(s) to read from. Use semicolons to separate multiple topics. |
topic-pattern | No | — | Java regex pattern to match topic names dynamically. |
properties.bootstrap.servers | Yes | — | Comma-separated list of Kafka broker addresses. |
properties.group.id | Yes (source) | — | Consumer group ID. |
scan.startup.mode | No | group-offsets | Where to start reading. |
scan.startup.timestamp-millis | No | — | Start timestamp in epoch milliseconds. Used with scan.startup.mode = 'timestamp'. |
scan.startup.specific-offsets | No | — | Per-partition start offsets, e.g. partition:0,offset:42;partition:1,offset:300. |
format | Yes (if key.format / value.format are not set) | — | Format for both key and value. |
key.format | No | — | Format for the message key. |
key.fields | No | — | Semicolon-separated list of fields to include in the key. |
value.format | No | — | Format for the message value. |
value.fields-include | No | ALL | Which fields to include in the value: ALL or EXCEPT_KEY. |
sink.partitioner | No | default | Partitioning strategy for the sink: default, fixed, round-robin, or a custom class name. |
sink.delivery-guarantee | No | at-least-once | Delivery guarantee for the sink: at-least-once or exactly-once. Exactly-once requires Kafka transactions. |
sink.transactional-id-prefix | No | — | Required when sink.delivery-guarantee = 'exactly-once'. Prefix for Kafka transactional IDs. |

