Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
This page demonstrates advanced streaming patterns in Apache Beam, including windowing strategies, trigger mechanisms, and handling late data.
Streaming Word Count
A basic streaming pipeline that reads from PubSub and processes data in fixed windows.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import window
def run_streaming_wordcount(input_topic, output_topic):
pipeline_options = PipelineOptions(streaming=True)
pipeline_options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=pipeline_options) as p:
# Read from PubSub
messages = (
p
| 'Read' >> beam.io.ReadFromPubSub(
topic=input_topic
).with_output_types(bytes)
)
lines = messages | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
# Apply windowing and count words
counts = (
lines
| 'ExtractWords' >> beam.FlatMap(lambda x: x.split())
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'Window' >> beam.WindowInto(window.FixedWindows(15))
| 'GroupByKey' >> beam.GroupByKey()
| 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
)
# Format and write results
output = (
counts
| 'Format' >> beam.Map(lambda wc: f'{wc[0]}: {wc[1]}'.encode('utf-8'))
| 'Write' >> beam.io.WriteToPubSub(output_topic)
)
Key Concepts:
- Set
streaming=True in pipeline options
- Use
WindowInto with FixedWindows for 15-second windows
- Data is processed continuously as it arrives
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
public class StreamingWordCount {
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
p.apply("Read", PubsubIO.readStrings().fromTopic(inputTopic))
.apply("ExtractWords", FlatMapElements
.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("\\s+"))))
.apply("Window", Window.into(
FixedWindows.of(Duration.standardSeconds(15))))
.apply("Count", Count.perElement())
.apply("Format", MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> kv) -> kv.getKey() + ": " + kv.getValue()))
.apply("Write", PubsubIO.writeStrings().to(outputTopic));
p.run();
}
}
Advanced Windowing with Triggers
Demonstrating different trigger types for controlling when results are emitted.
Default Trigger (Watermark-based)
The default trigger fires when the watermark passes the end of the window.
import apache_beam as beam
from apache_beam.transforms import window, trigger
from joda.time import Duration
# Default trigger - fires once when watermark passes window end
default_windowed = (
data
| 'FixedWindows' >> beam.WindowInto(
window.FixedWindows(Duration.standardMinutes(30)),
trigger=trigger.AfterWatermark(),
allowed_lateness=Duration.ZERO,
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
)
Behavior:
- Fires once when the watermark passes the window end
- Produces ON_TIME results
- Late data is dropped (zero allowed lateness)
Handling Late Data
Allow late data processing with allowed lateness.
from apache_beam.transforms import window, trigger
# Allow late data for up to 1 day
with_late_data = (
data
| 'WindowWithLateness' >> beam.WindowInto(
window.FixedWindows(30 * 60), # 30 minutes
trigger=trigger.AfterWatermark(),
allowed_lateness=24 * 60 * 60, # 1 day
accumulation_mode=trigger.AccumulationMode.DISCARDING
)
| 'CountPerWindow' >> beam.CombinePerKey(sum)
)
Key Points:
- Windows stay open for 1 day after watermark passes
- Each late element triggers a new pane (LATE timing)
- Use DISCARDING mode to get incremental updates
import org.apache.beam.sdk.transforms.windowing.*;
import org.joda.time.Duration;
PCollection<KV<String, Integer>> withLateData = flowInfo
.apply(Window
.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(30)))
.triggering(Repeatedly.forever(
AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)))
.apply(Combine.perKey(new SumInts()));
Speculative Results (Early Firings)
Get early approximations before all data arrives.
from apache_beam.transforms import window, trigger
# Fire early results every minute
speculative = (
data
| 'SpeculativeWindow' >> beam.WindowInto(
window.FixedWindows(30 * 60),
trigger=trigger.Repeatedly(
trigger.AfterProcessingTime(60) # Every 1 minute
),
allowed_lateness=24 * 60 * 60,
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
| 'Aggregate' >> beam.CombinePerKey(sum)
)
Use Case:
- Get quick approximations for dashboards
- Progressive refinement of results
- All panes are marked EARLY (no watermark dependency)
Combined Trigger Strategy
Combine early firings, on-time results, and late data handling.
from apache_beam.transforms import window, trigger
# Complete trigger strategy
combined = (
data
| 'CombinedTrigger' >> beam.WindowInto(
window.FixedWindows(30 * 60),
trigger=trigger.AfterAll(
trigger.Repeatedly(
trigger.AfterProcessingTime(60) # Early: every 1 min
),
trigger.AfterWatermark(
late=trigger.AfterProcessingTime(5 * 60) # Late: every 5 min
)
),
allowed_lateness=24 * 60 * 60,
accumulation_mode=trigger.AccumulationMode.ACCUMULATING
)
)
Timeline:
- EARLY panes: Every 1 minute before window closes
- ON_TIME pane: When watermark passes window end
- LATE panes: Every 5 minutes after window closes
import org.apache.beam.sdk.transforms.windowing.*;
import org.joda.time.Duration;
PCollection<KV<String, Integer>> sequential = flowInfo
.apply(Window
.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(30)))
.triggering(
AfterEach.inOrder(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)));
Windowing with Timestamps
Access window information in your pipeline for metadata enrichment.
import apache_beam as beam
from apache_beam.transforms import window
class FormatWithWindow(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
"""Add window start/end times to output."""
word, count = element
window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
yield {
'word': word,
'count': count,
'window_start': window_start.isoformat(),
'window_end': window_end.isoformat()
}
# Apply in pipeline
results = (
windowed_counts
| 'AddWindowInfo' >> beam.ParDo(FormatWithWindow())
)
Session Windows
Group events based on activity sessions with gaps of inactivity.
from apache_beam.transforms import window
# Create session windows with 10-minute gaps
sessions = (
events
| 'SessionWindows' >> beam.WindowInto(
window.Sessions(10 * 60) # 10-minute gap duration
)
| 'CountPerSession' >> beam.combiners.Count.PerElement()
)
Use Cases:
- User session analytics
- Detecting periods of activity
- Grouping related events
Sliding Windows
Create overlapping windows for moving averages and continuous analysis.
from apache_beam.transforms import window
# 1-hour windows, sliding every 5 minutes
sliding = (
metrics
| 'SlidingWindows' >> beam.WindowInto(
window.SlidingWindows(
size=60 * 60, # 1 hour window
period=5 * 60 # Slide every 5 minutes
)
)
| 'ComputeAverage' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
)
Best Practices
Choose Appropriate Windows
- Fixed windows for regular intervals
- Session windows for user activity
- Sliding windows for moving calculations
Configure Allowed Lateness
- Balance completeness vs. resource usage
- Consider your data’s lateness characteristics
- Use watermark estimators for better accuracy
Select Accumulation Mode
- DISCARDING for independent updates
- ACCUMULATING for cumulative results
- Consider storage and computation trade-offs
Monitor Watermarks
- Track watermark lag in production
- Adjust allowed lateness based on metrics
- Use custom watermark estimators if needed
Common Patterns
Traffic Analysis Example
Based on examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java:160-337
// Process traffic sensor data with multiple trigger strategies
PCollection<KV<String, Integer>> flowInfo = /* input data */;
// 1. Default trigger - watermark only
flowInfo
.apply(Window
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
// 2. With allowed lateness - capture late data
flowInfo
.apply(Window
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)));
// 3. Speculative - early approximations
flowInfo
.apply(Window
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)));
// 4. Sequential - early, on-time, and late
flowInfo
.apply(Window
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
.triggering(AfterEach.inOrder(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardDays(1)));
Windowing Guide
Learn windowing fundamentals
Triggers Guide
Deep dive into trigger mechanisms
Watermarks
Understanding watermarks and event time
Streaming I/O
Streaming sources and sinks