Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt
Use this file to discover all available pages before exploring further.
The Apache Flink platform adapter lets Wayang schedule operators onto a Flink execution environment. Flink is a unified stream and batch processing engine with a rich operator set, checkpoint-based fault tolerance, and competitive throughput for iterative algorithms. From Wayang’s perspective, you register Flink.basicPlugin() alongside your other platforms and let the optimizer decide when Flink is the best fit for a given operator or sub-plan.
Requirements
- An Apache Flink installation that is compatible with the
wayang-flink module. Set FLINK_HOME to the installation directory if you plan to submit jobs to a standalone or YARN cluster.
- Java 17 with
JAVA_HOME set.
- For cluster mode, a running Flink JobManager reachable at a known host and port.
Flink’s DataSet API (used by Flink.basicPlugin() with the default BASIC_MAPPINGS) is deprecated in recent Flink versions. Wayang also supports bounded DataStream mappings (BOUNDED_STREAM_MAPPINGS), which provide a forward-compatible path. See Execution modes below.
Maven dependency
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-flink</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Replace WAYANG_VERSION with the latest release on Maven Central.
Entry-point class: Flink
The Flink class in org.apache.wayang.flink provides three static factory methods:
import org.apache.wayang.flink.Flink;
// Standard operators via Flink's DataSet API (default)
// or bounded DataStreams depending on configuration
// Note: PageRankOperator is included inside basicPlugin() via BASIC_MAPPINGS
Flink.basicPlugin()
// Graph plugin stub — currently empty; PageRank is already in basicPlugin()
Flink.graphPlugin()
// Channel-conversion rules (Flink DataSet ↔ Java Collection, etc.)
Flink.conversionPlugin()
Execution modes
FlinkBasicPlugin supports two mapping sets, selected via configuration:
| Mode | Mapping set | Flink API used | Notes |
|---|
| Default (batch) | BASIC_MAPPINGS | DataSet API | Full operator coverage; DataSet API is deprecated in Flink |
| Bounded streams | BOUNDED_STREAM_MAPPINGS | DataStream API (bounded) | Forward-compatible; covers TextFileSource, Map, Join, LocalCallbackSink |
Switch to bounded stream mappings by setting the following property:
Configuration conf = new Configuration();
conf.setProperty("wayang.flink.platforms.useDataStreams", "true");
WayangContext wayang = new WayangContext(conf)
.withPlugin(Flink.basicPlugin());
Supported operators
DataSet mappings (default BASIC_MAPPINGS)
Flink.basicPlugin() in default mode provides Flink implementations for:
- Sources:
TextFileSource, ObjectFileSource, CollectionSource
- Transformations:
MapOperator, MapPartitionsOperator, FlatMapOperator, FilterOperator, SortOperator, DistinctOperator, ZipWithIdOperator
- Aggregations:
ReduceByOperator, GlobalReduceOperator, MaterializedGroupByOperator, GlobalMaterializedGroupOperator, CountOperator, GroupByOperator
- Multi-input:
JoinOperator, CartesianOperator, CoGroupOperator, UnionAllOperator, IntersectOperator
- Control flow:
LoopOperator, DoWhileOperator, RepeatOperator, SampleOperator
- Sinks:
TextFileSink, ObjectFileSink, LocalCallbackSink
PageRankOperator is mapped by PageRankMapping, which is already included in BASIC_MAPPINGS (used by Flink.basicPlugin()). Flink.graphPlugin() exists as a stub but currently contributes no additional mappings.
Bounded DataStream mappings (BOUNDED_STREAM_MAPPINGS)
When wayang.flink.platforms.useDataStreams = true, only these operators are available:
BoundedTextFileSource (bounded DataStream source)
StreamedMapOperator
StreamedJoinOperator
StreamedLocalCallbackSinkOperator
The bounded DataStream mapping set is smaller than the DataSet mapping set. If an operator is not covered, the optimizer will attempt to route it to another registered platform (such as Java Streams). Always register Java.basicPlugin() alongside Flink to give the optimizer a fallback for unsupported operators.
Configuration
Default properties (wayang-flink-defaults.properties)
conf/flink/default.properties
Override via Configuration
# Cost-model parameters
wayang.flink.cpu.mhz = 2700
wayang.flink.hdfs.ms-per-mb = 2.7
wayang.flink.network.ms-per-mb = 8.6
wayang.flink.init.ms = 4500
wayang.flink.stretch = 1
wayang.flink.costs.fix = 0.0
wayang.flink.costs.per-ms = 1.0
# Execution mode: collection | local | distribution
wayang.flink.mode.run = collection
wayang.flink.mode.execution = batch_forced
wayang.flink.parallelism = 1
wayang.flink.maxExpanded = 5
The repository’s conf/flink/default.properties shows the three available run modes:# Distributed cluster mode
# wayang.flink.mode.run = distribution
# wayang.flink.master =
# wayang.flink.port =
# wayang.flink.paralelism =
# Local multi-threaded mode
# wayang.flink.mode.run = local
# wayang.flink.paralelism = 1
# Single-threaded collection mode (default; good for tests)
wayang.flink.mode.run = collection
wayang.flink.paralelism = 1
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.flink.Flink;
Configuration conf = new Configuration();
// Switch to distributed cluster mode
conf.setProperty("wayang.flink.mode.run", "distribution");
conf.setProperty("wayang.flink.master", "flink-master-host");
conf.setProperty("wayang.flink.port", "6123");
conf.setProperty("wayang.flink.parallelism", "16");
// Update cost-model to reflect actual cluster size
conf.setProperty("wayang.flink.cpu.mhz", "3200");
WayangContext wayang = new WayangContext(conf)
.withPlugin(Flink.basicPlugin());
Run modes explained
wayang.flink.mode.run | Description |
|---|
collection | Single-threaded, in-process execution without a real Flink environment. Best for unit tests. |
local | Creates a local ExecutionEnvironment with the specified parallelism. No cluster needed. |
distribution | Connects to an external Flink cluster using wayang.flink.master and wayang.flink.port. |
Registering Flink with Java Streams as a fallback
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.flink.Flink;
WayangContext wayang = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin()) // fallback for operators not covered by Flink
.withPlugin(Flink.basicPlugin()); // Flink for large-scale batch stages
Always register Java.basicPlugin() alongside Flink. The Java Streams platform acts as a fallback for any operator not mapped in the Flink plugin (particularly important when using BOUNDED_STREAM_MAPPINGS), and it is also the preferred engine for lightweight stages such as final result collection and small in-memory aggregations.
Word count on Flink
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.java.Java;
import org.apache.wayang.flink.Flink;
import java.util.Arrays;
public class FlinkWordCount {
public static void main(String[] args) {
Configuration conf = new Configuration();
// Run in local mode with 4 task slots
conf.setProperty("wayang.flink.mode.run", "local");
conf.setProperty("wayang.flink.parallelism", "4");
WayangContext wayang = new WayangContext(conf)
.withPlugin(Java.basicPlugin())
.withPlugin(Flink.basicPlugin());
new JavaPlanBuilder(wayang)
.withJobName("FlinkWordCount")
.withUdfJarOf(FlinkWordCount.class)
.readTextFile("file:///path/to/input.txt")
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.filter(word -> !word.isEmpty())
.map(word -> new Tuple2<>(word.toLowerCase(), 1))
.reduceByKey(
Tuple2::getField0,
(a, b) -> new Tuple2<>(a.getField0(), a.getField1() + b.getField1()))
.writeTextFile(
"file:///path/to/output.txt",
t -> t.getField0() + ": " + t.getField1());
}
}