Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt
Use this file to discover all available pages before exploring further.
Apache Wayang’s Java API gives you a fluent, Scala-like builder that drives the full pipeline — from source to sink — without writing a line of Scala. JavaPlanBuilder is the entry point: create a WayangContext, hand it to JavaPlanBuilder, and chain transformation methods that each return a DataQuantaBuilder so the optimizer has everything it needs before a single byte is read.
Setup and Maven dependency
Add the core bundle to your pom.xml. Replace WAYANG_VERSION with the latest release from Maven Central.
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-assembly</artifactId>
<version>WAYANG_VERSION</version>
<classifier>bin</classifier>
<type>tar.gz</type>
</dependency>
For a modular setup add individual platform artifacts instead:
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-java</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-spark_2.12</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Creating WayangContext and JavaPlanBuilder
Every Java pipeline starts with a WayangContext that declares which execution engines are available, followed by a JavaPlanBuilder that builds and executes the logical plan.
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin()) // local Java Streams engine
.withPlugin(Spark.basicPlugin()); // Apache Spark engine
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName("MyJob")
.withUdfJarOf(MyApp.class); // ship this class's JAR to remote engines
JavaPlanBuilder constructor overloads:
| Constructor | Description |
|---|
new JavaPlanBuilder(WayangContext ctx) | Creates a builder with no pre-set job name. |
new JavaPlanBuilder(WayangContext ctx, String jobName) | Creates a builder with a job name for logging and tracking. |
Builder configuration methods
These methods configure the plan and return this for chaining.
withJobName(String jobName)
Sets a human-readable name for the job. Appears in logs and the cost model experiment output.
Registers a JAR file that will be shipped to remote execution engines (e.g., Spark workers). Pass the path to your application JAR.
withUdfJarOf(Class<?> cls)
Convenience overload — resolves the JAR file that contains the given class and registers it automatically.
Source operators
Source methods on JavaPlanBuilder begin a pipeline by creating a DataQuantaBuilder from an external dataset.
readTextFile
planBuilder.readTextFile(String url)
Reads a text file line by line and produces a DataQuantaBuilder<String>. The URL must be a fully qualified URI such as file:///path/to/file.txt or hdfs://host/path.
planBuilder.readTextFile("file:///data/input.txt")
.withName("Read input");
readParquet
planBuilder.readParquet(String url, String[] projection)
planBuilder.readParquet(String url, String[] projection, boolean preferDataset)
Reads a Parquet file and returns DataQuantaBuilder<Record>. Pass null for projection to read all columns. Set preferDataset = true to keep the channel backed by a Spark Dataset for downstream Dataset-aware operations.
planBuilder.readParquet("hdfs:///warehouse/sales.parquet", new String[]{"id", "amount"});
readApacheIcebergTable
planBuilder.readApacheIcebergTable(Catalog catalog, TableIdentifier tableIdentifier,
Expression[] filterExpressions, String[] projectionColumns)
Reads an Apache Iceberg table via a Catalog instance and returns DataQuantaBuilder<Record>. Both filterExpressions and projectionColumns may be null.
readTable
planBuilder.readTable(TableSource source)
Reads rows from a JDBC-compatible TableSource and returns a RecordDataQuantaBuilder.
loadCollection
planBuilder.loadCollection(java.util.Collection<T> collection)
Wraps an in-memory Collection as a DataQuantaBuilder<T>. Useful for small lookup tables and test data.
Cloud storage sources
planBuilder.readGoogleCloudStorageFile(String bucket, String blobName, String credentialsPath)
planBuilder.readAmazonS3File(String bucket, String blobName, String credentialsPath)
planBuilder.readAzureBlobStorageFile(String container, String blobName, String credentialsPath)
planBuilder.readKafkaTopic(String topicName)
Each returns a DataQuantaBuilder<String> (one element per line / message).
After a source call you have a DataQuantaBuilder<T>. Every method below transforms the stream and returns a new DataQuantaBuilder unless the method is a sink (triggers execution).
map / mapJava
Transform each element one-to-one.
// Java lambda (preferred from Java code)
.map(word -> word.toLowerCase())
// Explicit SerializableFunction
.mapJava((SerializableFunction<String, String>) String::toLowerCase)
Transformation function applied to every element.
udfLoad
LoadProfileEstimator (optional)
Cost estimator for the UDF. Omit in most cases; the default heuristic works well.
flatMap / flatMapJava
Transform each element into zero or more output elements.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9) // min, max, confidence
selectivity
ProbabilisticDoubleInterval (optional)
Hints to the optimizer how many outputs each input produces on average.
filter / filterJava
Keep only elements matching a predicate.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
Predicate; elements where it returns true are kept.
SQL WHERE clause equivalent, used when the operator is pushed down to a database.
reduceByKey
Group elements by a key and aggregate within each group.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
Extracts the grouping key.
Binary aggregation function applied within each group.
reduce / globalReduce
Aggregate all elements in the dataset into a single value.
groupByKey / materializedGroupBy
Group elements by a key and collect each group into an Iterable.
.groupByKey(word -> word.charAt(0))
// returns DataQuantaBuilder<Iterable<String>>
count
Count the total number of elements; returns DataQuantaBuilder<Long>.
sort
Sort elements by a key extractor.
distinct
Remove duplicate elements.
union
Merge two datasets of the same type.
DataQuantaBuilder<String> left = planBuilder.readTextFile("file:///a.txt");
DataQuantaBuilder<String> right = planBuilder.readTextFile("file:///b.txt");
left.union(right)
intersect
Keep only elements present in both datasets.
join / joinJava
Equi-join two datasets on extracted keys. The result type is Tuple2<Left, Right>.
users.join(
u -> u.getId(), // key from left
orders,
o -> o.getUserId() // key from right
)
coGroup
Like join, but produces pairs of Iterable groups instead of flat pairs.
users.coGroup(u -> u.getId(), orders, o -> o.getUserId())
// DataQuantaBuilder<Tuple2<Iterable<User>, Iterable<Order>>>
cartesian
Compute the Cartesian product of two datasets.
sample
Sample a fixed number of elements (optionally with a seed).
.sample(1000)
.sample(1000, SampleOperator.UNKNOWN_DATASET_SIZE, Optional.of(42L), SampleOperator.Methods.RANDOM)
doWhile / doWhileJava
Execute a loop body until a convergence condition holds.
initialData.doWhileJava(
convergenceSet -> convergenceSet.size() < threshold,
loopInput -> {
DataQuantaBuilder<T> next = loopInput.map(...);
DataQuantaBuilder<T> conv = next.filter(...);
return new WayangTuple<>(next, conv);
},
20 // numExpectedIterations
)
repeat / repeatJava
Execute a loop body a fixed number of times.
initialCentroids.repeat(100, current ->
current.map(reassignPoints)
.reduceByKey(TaggedPoint::getCluster, TaggedPoint::sum)
.map(TaggedPoint::average)
)
Sink operators (trigger execution)
Calling a sink method compiles the logical plan, runs the optimizer, and executes the job.
collect
Collect all output elements into a Java Collection on the driver.
Collection<Tuple2<String, Integer>> results = planBuilder
.readTextFile("file:///input.txt")
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.map(w -> new Tuple2<>(w, 1))
.reduceByKey(Tuple2::getField0, (a, b) -> new Tuple2<>(a.getField0(), a.getField1() + b.getField1()))
.collect();
writeTextFile
Write each element, formatted by a UDF, to a text file. The jobName parameter is required on DataQuantaBuilder to label the execution job.
.writeTextFile("file:///output.txt", t -> t.getField0() + "\t" + t.getField1(), "WriteResults")
foreach / foreachJava
Apply a side-effecting action to each element.
.foreach(System.out::println)
DataQuantaBuilder annotation methods
Every DataQuantaBuilder exposes the following fluent annotations. These do not change the data flow — they provide information to the optimizer and logging infrastructure.
| Method | Purpose |
|---|
.withName(String name) | Labels the operator in logs and plan visualizations. |
.withSelectivity(double lowerBound, double upperBound, double confidence) | Hints to the optimizer how much data this operator produces relative to its input. |
.withCardinalityEstimator(CardinalityEstimator estimator) | Plugs in a custom cardinality estimator for more accurate cost modeling. |
.withTargetPlatform(Platform platform) | Restricts execution of this operator to a specific platform. |
.withUdfJarOf(Class<?> cls) | Ensures the JAR containing cls is shipped to remote engines. |
.withBroadcast(DataQuantaBuilder<?> sender, String name) | Makes the sender dataset available as a broadcast variable inside this operator’s UDF. |
Complete Java WordCount example
The WordCount pipeline is the canonical demonstration of Wayang’s API. It runs unchanged on local Java Streams, Apache Spark, or both simultaneously — you only change which plugins you register.
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Arrays;
import java.util.Collection;
public class WordCountJava {
public static void main(String[] args) {
String inputUrl = "file:///tmp/words.txt";
// Register execution platforms.
WayangContext wayangContext = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin());
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName(String.format("WordCount (%s)", inputUrl))
.withUdfJarOf(WordCountJava.class);
// Build and execute the plan.
Collection<Tuple2<String, Integer>> wordcounts = planBuilder
// 1. Read the text file — one String per line.
.readTextFile(inputUrl).withName("Load file")
// 2. Split each line into words.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.withSelectivity(10, 100, 0.9)
.withName("Split words")
// 3. Drop empty tokens produced by leading/trailing punctuation.
.filter(token -> !token.isEmpty())
.withSelectivity(0.99, 0.99, 0.99)
.withName("Filter empty words")
// 4. Pair each word with count 1.
.map(word -> new Tuple2<>(word.toLowerCase(), 1))
.withName("To lower case, add counter")
// 5. Sum counts per word.
.reduceByKey(
Tuple2::getField0,
(t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
.withCardinalityEstimator(
new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))
)
.withName("Sum counts")
// 6. Collect results back to the driver.
.collect();
wordcounts.forEach(System.out::println);
}
}
The same pipeline runs on Spark by adding .withPlugin(Spark.basicPlugin()). Register both and the cost-based optimizer will split the job across engines automatically.
Broadcast variables
Broadcast variables let you ship a small reference dataset to every executor so that UDFs can read from it without a join.
Collection<String> stopWords = Arrays.asList("the", "a", "an");
DataQuantaBuilder<String> stopWordsDQ = planBuilder
.loadCollection(stopWords)
.withName("Stop words");
planBuilder
.readTextFile("file:///corpus.txt")
.flatMap(line -> Arrays.asList(line.split("\\W+")))
.filter(word -> true) // UDF accesses the broadcast inside an ExtendedSerializableFunction
.withBroadcast(stopWordsDQ, "stopWords")
.collect();
Inside an ExtendedSerializableFunction call executionContext.getBroadcast("stopWords") in the open() method to retrieve the broadcasted collection.
Writing results
Text file
Kafka topic
Database table
.writeTextFile(
"file:///output/results.txt",
tuple -> tuple.getField0() + "\t" + tuple.getField1(),
"WriteResults"
);
.writeKafkaTopic("output-topic", record -> record.toString(), "WriteKafka", null);
Properties props = new Properties();
props.setProperty("url", "jdbc:postgresql://host/db");
props.setProperty("user", "wayang");
props.setProperty("password", "secret");
.writeTable("results", "overwrite", new String[]{"word", "count"}, props);