Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

Apache Wayang ships with a set of reference applications that demonstrate its core ideas: write your pipeline once against the Wayang API, then point it at any registered execution engine without touching a line of pipeline code. The examples below progress from a file-based WordCount on a single local engine, through a multi-platform version that lets the optimizer choose, to a streaming WordCount that reads from and writes to Apache Kafka topics, and finally to an iterative k-means clustering job that showcases loop support. Each example is self-contained and directly runnable once the required Maven modules are on the classpath.

WordCount

WordCount is the canonical “Hello, World” of data processing. It reads a text file, splits each line on whitespace, normalises tokens to lower case, and counts how many times each word appears. The example below is taken directly from the wayang-applications module and uses the Java fluent API.

Required Maven modules

<dependency>
    <groupId>org.apache.wayang</groupId>
    <artifactId>wayang-core</artifactId>
    <version>WAYANG_VERSION</version>
</dependency>
<dependency>
    <groupId>org.apache.wayang</groupId>
    <artifactId>wayang-basic</artifactId>
    <version>WAYANG_VERSION</version>
</dependency>
<dependency>
    <groupId>org.apache.wayang</groupId>
    <artifactId>wayang-api-scala-java</artifactId>
    <version>WAYANG_VERSION</version>
</dependency>
<!-- Include one or both platform adapters -->
<dependency>
    <groupId>org.apache.wayang</groupId>
    <artifactId>wayang-java</artifactId>
    <version>WAYANG_VERSION</version>
</dependency>
<dependency>
    <groupId>org.apache.wayang</groupId>
    <artifactId>wayang-spark</artifactId>
    <version>WAYANG_VERSION</version>
</dependency>
Replace WAYANG_VERSION with the latest release on Maven Central. For snapshot builds append the Apache snapshot repository — see the Modules reference for the repository XML.

Code

package org.apache.wayang.applications;

import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Collection;
import java.util.Arrays;

public class WordCount {

    public static void main(String[] args) {

        // Settings — input path is args[1]; args[0] is a placeholder (e.g. "--")
        String inputUrl = args[1];

        // Get a plan builder.
        // Register only Java to run locally; uncomment Spark to let the
        // optimizer choose between both engines.
        WayangContext wayangContext = new WayangContext(new Configuration())
                .withPlugin(Java.basicPlugin());
        //        .withPlugin(Spark.basicPlugin());
        JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
                .withJobName(String.format("WordCount (%s)", inputUrl))
                .withUdfJarOf(WordCount.class);

        // Start building the WayangPlan.
        Collection<Tuple2<String, Integer>> wordcounts = planBuilder
                // Read the text file.
                .readTextFile(inputUrl).withName("Load file")

                // Split each line by non-word characters.
                .flatMap(line -> Arrays.asList(line.split("\\W+")))
                .withSelectivity(10, 100, 0.9)
                .withName("Split words")

                // Filter empty tokens.
                .filter(token -> !token.isEmpty())
                .withSelectivity(0.99, 0.99, 0.99)
                .withName("Filter empty words")

                // Attach counter to each word.
                .map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")

                // Sum up counters for every word.
                .reduceByKey(
                        Tuple2::getField0,
                        (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
                )
                .withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
                .withName("Add counters")

                // Execute the plan and collect the results.
                .collect();

        System.out.println(wordcounts);
        System.out.println("*** Done. ***");
    }
}

How to run

1

Build and package

From the repository root, compile the applications module and package the assembly distribution:
./mvnw clean install -DskipTests -pl wayang-applications
./mvnw clean package -pl :wayang-assembly -Pdistribution
2

Extract the distribution

cd wayang-assembly/target/
tar -xvf apache-wayang-assembly-*-dist.tar.gz
cd wayang-*/
3

Submit the job

./bin/wayang-submit \
  org.apache.wayang.applications.WordCount \
  -- \
  file:///path/to/input.txt
To run on Spark, uncomment .withPlugin(Spark.basicPlugin()) in the source and repackage before submitting.

Key API patterns across all examples

Every Wayang pipeline follows the same three-step skeleton regardless of language or platform:
WayangContext wayangContext = new WayangContext(new Configuration())
        .withPlugin(Java.basicPlugin())   // local Java Streams engine
        .withPlugin(Spark.basicPlugin()); // Apache Spark engine
Register as many or as few platforms as you need. Register one platform for predictability during development; register several in production and let the optimizer decide.
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
        .withJobName("MyJob")
        .withUdfJarOf(MyClass.class);

// Chain operators exactly like a Java Stream or Scala collection.
Collection<String> result = planBuilder
        .readTextFile("file:///input.txt")
        .flatMap(line -> Arrays.asList(line.split("\\W+")))
        .filter(w -> !w.isEmpty())
        .collect();
Every operator call returns a DataQuanta<T> that you continue chaining. Calling .collect(), .writeTextFile(), or .writeKafkaTopic() triggers execution.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)   // (lower, upper, confidence)

.reduceByKey(...)
.withCardinalityEstimator(
    new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))
)
These hints feed Wayang’s cost-based optimizer. Better hints lead to better platform placement decisions. They are optional but recommended for large-scale jobs.

Build docs developers (and LLMs) love