Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt

Use this file to discover all available pages before exploring further.

The DataStream API is Flink’s core API for building data stream processing programs. It lets you express transformations on potentially unbounded streams of data — filtering, mapping, aggregating, windowing — and connect sources to sinks to produce results. A DataStream is an immutable, distributed collection of data. You cannot inspect its elements directly or mutate them in place. Instead, you chain API operations that describe a computation graph, then trigger execution with env.execute().

Program structure

Every DataStream program follows the same five-step pattern:
1

Obtain a StreamExecutionEnvironment

The StreamExecutionEnvironment is the entry point for every Flink program. It holds configuration and creates data sources.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Use getExecutionEnvironment() in almost all cases. It detects context automatically: it creates a local, single-JVM environment when you run from an IDE, and connects to a cluster when you submit a JAR with bin/flink run.Two other factory methods exist for specific cases:
// Explicitly create a local environment (useful in tests)
StreamExecutionEnvironment local = StreamExecutionEnvironment.createLocalEnvironment();

// Connect to a specific remote cluster
StreamExecutionEnvironment remote = StreamExecutionEnvironment.createRemoteEnvironment(
    "host", 6123, "/path/to/job.jar"
);
2

Load or create initial data

Attach a source to the environment to get a DataStream. Flink provides built-in sources for collections, files, sockets, and sequences, plus a connector ecosystem for Kafka, Kinesis, and more.
// From a bounded collection (useful for testing)
DataStream<String> words = env.fromData("hello", "world", "hello", "flink");

// From a file
FileSource<String> fileSource = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("file:///data/input"))
    .build();
DataStream<String> lines = env.fromSource(
    fileSource, WatermarkStrategy.noWatermarks(), "file-source"
);

// From a socket (useful for quick experiments)
DataStream<String> socket = env.socketTextStream("localhost", 9999);
3

Apply transformations

Transformations produce new DataStream instances from existing ones. They are lazy — nothing executes until you call execute().
DataStream<String> words = env.fromData("hello world", "hello flink");

DataStream<String> split = words.flatMap(
    (String line, Collector<String> out) -> {
        for (String word : line.split(" ")) {
            out.collect(word);
        }
    }
).returns(String.class);

DataStream<String> filtered = split.filter(word -> !word.isEmpty());
4

Write results to a sink

Sinks consume a DataStream and write records to external systems. For production use, prefer sinkTo() with the Sink API over the older addSink().
// Print to stdout — good for development
filtered.print();

// Write to files with exactly-once semantics
FileSink<String> fileSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>())
    .build();
filtered.sinkTo(fileSink);
5

Trigger execution

Flink programs are evaluated lazily. Calling execute() submits the job graph for execution and blocks until the job finishes.
env.execute("My Streaming Job");
Use executeAsync() if you want to submit without blocking:
JobClient client = env.executeAsync("My Streaming Job");
JobExecutionResult result = client.getJobExecutionResult().get();

Complete example: streaming word count

The following program reads words from a socket, counts them in 5-second tumbling windows, and prints results to stdout.
WindowWordCount.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;
import java.time.Duration;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Tokenizer())
            .keyBy(value -> value.f0)
            .window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
            .sum(1);

        counts.print();

        env.execute("Window WordCount");
    }

    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) {
            for (String word : sentence.split("\\s")) {
                if (!word.isEmpty()) {
                    out.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}
Start a netcat listener to feed the program input:
nc -lk 9999
Then run the program. Type words into the netcat terminal and watch counts appear.

Key concepts

Lazy evaluation: Calling .map(), .filter(), or .keyBy() builds a dataflow graph in memory. No data moves until execute() is called. This lets Flink optimize the full graph before execution begins. Parallelism: Each operator runs as one or more parallel instances. Set job-level parallelism with env.setParallelism(n) or per-operator with .map(...).setParallelism(n). Operator chaining: Flink automatically chains adjacent operators that can share a thread (for example, consecutive map() calls) to reduce serialization overhead. You can disable chaining globally or per operator. Buffer timeout: By default, Flink buffers records for up to 100 ms before flushing to downstream operators. Lower this for latency-sensitive pipelines:
env.setBufferTimeout(10); // milliseconds

Explore further

Execution Mode

Switch between STREAMING and BATCH execution modes for bounded data.

Data Sources

Built-in sources, the FLIP-27 Source API, and FileSource.

Operators

map, flatMap, filter, keyBy, window, connect, and more.

Data Sinks

FileSink, print, custom sinks, and exactly-once delivery.

Event Time & Watermarks

WatermarkStrategy, TimestampAssigner, and windowing with event time.

Fault Tolerance

Checkpointing, restart strategies, and exactly-once guarantees.

Working with State

ValueState, ListState, MapState, and state TTL.

Side Outputs

Route records to multiple output streams from a single operator.

Build docs developers (and LLMs) love