Apache Wayang ships with a set of reference applications that demonstrate its core ideas: write your pipeline once against the Wayang API, then point it at any registered execution engine without touching a line of pipeline code. The examples below progress from a file-based WordCount on a single local engine, through a multi-platform version that lets the optimizer choose, to a streaming WordCount that reads from and writes to Apache Kafka topics, and finally to an iterative k-means clustering job that showcases loop support. Each example is self-contained and directly runnable once the required Maven modules are on the classpath.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt
Use this file to discover all available pages before exploring further.
- WordCount
- WordCount on Kafka
- K-Means Clustering
WordCount
WordCount is the canonical “Hello, World” of data processing. It reads a text file, splits each line on whitespace, normalises tokens to lower case, and counts how many times each word appears. The example below is taken directly from thewayang-applications module and uses the Java fluent API.Required Maven modules
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-core</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-basic</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-api-scala-java</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<!-- Include one or both platform adapters -->
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-java</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-spark</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Replace
WAYANG_VERSION with the latest release on Maven Central. For snapshot builds append the Apache snapshot repository — see the Modules reference for the repository XML.Code
package org.apache.wayang.applications;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Collection;
import java.util.Arrays;
public class WordCount {
public static void main(String[] args) {
// Settings — input path is args[1]; args[0] is a placeholder (e.g. "--")
String inputUrl = args[1];
// Get a plan builder.
// Register only Java to run locally; uncomment Spark to let the
// optimizer choose between both engines.
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin());
// .withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", inputUrl))
.withUdfJarOf(WordCount.class);
// Start building the WayangPlan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
// Read the text file.
.readTextFile(inputUrl).withName("Load file")
// Split each line by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Execute the plan and collect the results.
.collect();
System.out.println(wordcounts);
System.out.println("*** Done. ***");
}
}
How to run
Build and package
From the repository root, compile the applications module and package the assembly distribution:
./mvnw clean install -DskipTests -pl wayang-applications
./mvnw clean package -pl :wayang-assembly -Pdistribution
Extract the distribution
cd wayang-assembly/target/
tar -xvf apache-wayang-assembly-*-dist.tar.gz
cd wayang-*/
WordCount on Kafka
This example extends the canonical WordCount to read records from an Apache Kafka topic and write the aggregated word counts back to a second Kafka topic. The pipeline structure is identical to the file-based version — only the source and sink operators change. Kafka support is provided through theKafkaTopicSource and KafkaTopicSink operators built into wayang-basic.Required Maven modules
<!-- Core and basic operators — always required -->
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-core</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-basic</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<!-- Java execution engine -->
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-java</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<!-- Kafka client (required at runtime by KafkaTopicSource / KafkaTopicSink) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.9.2</version>
</dependency>
Kafka connection properties (bootstrap servers, security settings, consumer group IDs) are loaded from
wayang-kafka-defaults.properties. Make sure this file is present on the classpath and configured before running this example.Code
package org.apache.wayang.applications;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.java.Java;
import org.apache.log4j.Logger;
import java.util.Arrays;
public class WordCountOnKafkaTopic {
private static final Logger logger = Logger.getLogger(WordCountOnKafkaTopic.class);
public static void main(String[] args) {
// Default topic names — override by passing args[0]
String input_topicName = "banking-tx-small-csv";
String output_topicName = "word_count_contribution___banking-tx-small-csv";
if (args.length > 0) {
input_topicName = args[0];
}
Configuration configuration = new Configuration();
// Get a plan builder.
WayangContext wayangContext = new WayangContext(configuration)
.withPlugin(Java.basicPlugin());
// .withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format(
"WordCount using Java Context on Kafka topic (%s)", input_topicName))
.withUdfJarOf(WordCountOnKafkaTopic.class);
// Start building the WayangPlan.
planBuilder
// Read records from the input Kafka topic.
.readKafkaTopic(input_topicName).withName("Load data from topic")
// Split each record by non-word characters.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// Filter empty tokens.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// Attach counter to each word.
.map(word -> new Tuple2<>(word.toLowerCase(), 1)).withName("To lower case, add counter")
// Sum up counters for every word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0])))
.withName("Add counters")
// Write results back to a Kafka topic.
.writeKafkaTopic(output_topicName, d -> String.format("%d, %s", d.getField1(), d.getField0()), "job_test_1",
LoadProfileEstimators.createFromSpecification("wayang.java.kafkatopicsink.load", configuration));
System.out.println("### Done. ***");
}
}
How to run
Start a Kafka broker
This example requires an accessible Kafka broker. For local development you can start one with Docker:
docker run -d --name kafka -p 9092:9092 apache/kafka:latest
Create the topics
# Input topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic banking-tx-small-csv
# Output topic
kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--topic word_count_contribution___banking-tx-small-csv
K-Means Clustering
Wayang supports iterative processing through itsrepeat operator, which is essential for machine learning workloads. This k-means implementation clusters 2-D points by iterating the assign-and-average loop a fixed number of times. A broadcast variable passes the current centroid set to each mapJava call without a separate shuffle. The example uses the Scala API and registers both the Java and Spark engines so the optimizer can pick the most cost-effective platform at each iteration.Required Maven modules
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-core</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-basic</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-api-scala-java</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-java</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-spark</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Input format
The input file contains one 2-D point per line in CSV format:1.2,3.4
5.6,7.8
...
Code
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import scala.util.Random
import scala.collection.JavaConversions._
object kmeans {
def main(args: Array[String]) {
// Settings
val inputUrl = "file:/kmeans.txt"
val k = 5
val iterations = 100
val configuration = new Configuration
// Get a plan builder.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")
.withUdfJarsOf(this.getClass)
case class Point(x: Double, y: Double)
case class TaggedPoint(x: Double, y: Double, cluster: Int)
case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
def add_points(that: TaggedPointCounter) = TaggedPointCounter(this.x + that.x, this.y + that.y, this.cluster, this.count + that.count)
def average = TaggedPointCounter(x / count, y / count, cluster, 0)
}
// Read and parse the input file(s).
val points = planBuilder
.readTextFile(inputUrl).withName("Read file")
.map { line =>
val fields = line.split(",")
Point(fields(0).toDouble, fields(1).toDouble)
}.withName("Create points")
// Create initial centroids.
val random = new Random
val initialCentroids = planBuilder
.loadCollection(for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)).withName("Load random centroids")
// Declare UDF to select centroid for each data point.
class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {
/** Keeps the broadcasted centroids. */
var centroids: Iterable[TaggedPointCounter] = _
override def open(executionCtx: ExecutionContext) = {
centroids = executionCtx.getBroadcast[TaggedPointCounter]("centroids")
}
override def apply(point: Point): TaggedPointCounter = {
var minDistance = Double.PositiveInfinity
var nearestCentroidId = -1
for (centroid <- centroids) {
val distance = Math.pow(Math.pow(point.x - centroid.x, 2) + Math.pow(point.y - centroid.y, 2), 0.5)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = centroid.cluster
}
}
new TaggedPointCounter(point.x, point.y, nearestCentroidId, 1)
}
}
// Do the k-means loop.
val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
points
.mapJava(new SelectNearestCentroid,
udfLoad = LoadProfileEstimators.createFromSpecification(
"my.udf.costfunction.key", configuration
))
.withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
.reduceByKey(_.cluster, _.add_points(_)).withName("Add up points")
.withCardinalityEstimator(k)
.map(_.average).withName("Average points")
}).withName("Loop")
// Collect the results.
.collect()
println(finalCentroids)
}
}
How to run
Prepare the input data
Generate a sample point file (or supply your own):
python3 -c "
import random
with open('/tmp/kmeans.txt', 'w') as f:
for _ in range(10000):
f.write(f'{random.gauss(0,1):.6f},{random.gauss(0,1):.6f}\n')
"
Build and package
./mvnw clean package -pl :wayang-assembly -Pdistribution
cd wayang-assembly/target/
tar -xvf apache-wayang-assembly-*-dist.tar.gz
cd wayang-*/
Submit the job
The k-means example above is a standalone Scala object. Add it to a Maven module, compile it, and include it in the assembly fat-jar. Once packaged, submit it with:Alternatively, the
./bin/wayang-submit \
kmeans \
file:///tmp/kmeans.txt
wayang-benchmark module ships a production-ready k-means implementation at org.apache.wayang.apps.kmeans.Kmeans that accepts an experiment descriptor and platform list as the first two arguments (see wayang-benchmark/README.md for the full parameter format).Increase
iterations or k in the source to stress-test the optimizer’s cross-platform decisions. On large inputs Wayang will push the reduceByKey to Spark while keeping small broadcast-variable lookups on the local Java engine.Key API patterns across all examples
Every Wayang pipeline follows the same three-step skeleton regardless of language or platform:1. Create a WayangContext and register platforms
1. Create a WayangContext and register platforms
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin()) // local Java Streams engine
.withPlugin(Spark.basicPlugin()); // Apache Spark engine
2. Build the plan with a PlanBuilder
2. Build the plan with a PlanBuilder
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName("MyJob")
.withUdfJarOf(MyClass.class);
// Chain operators exactly like a Java Stream or Scala collection.
Collection<String> result = planBuilder
.readTextFile("file:///input.txt")
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.filter(w -> !w.isEmpty())
.collect();
DataQuanta<T> that you continue chaining. Calling .collect(), .writeTextFile(), or .writeKafkaTopic() triggers execution.3. Provide cardinality and selectivity hints
3. Provide cardinality and selectivity hints
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9) // (lower, upper, confidence)
.reduceByKey(...)
.withCardinalityEstimator(
new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))
)
