Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

Apache Wayang’s Java API gives you a fluent, Scala-like builder that drives the full pipeline — from source to sink — without writing a line of Scala. JavaPlanBuilder is the entry point: create a WayangContext, hand it to JavaPlanBuilder, and chain transformation methods that each return a DataQuantaBuilder so the optimizer has everything it needs before a single byte is read.

Setup and Maven dependency

Add the core bundle to your pom.xml. Replace WAYANG_VERSION with the latest release from Maven Central.
pom.xml
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-assembly</artifactId>
  <version>WAYANG_VERSION</version>
  <classifier>bin</classifier>
  <type>tar.gz</type>
</dependency>
For a modular setup add individual platform artifacts instead:
pom.xml
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-java</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-spark_2.12</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>

Creating WayangContext and JavaPlanBuilder

Every Java pipeline starts with a WayangContext that declares which execution engines are available, followed by a JavaPlanBuilder that builds and executes the logical plan.
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

WayangContext wayangContext = new WayangContext(new Configuration())
        .withPlugin(Java.basicPlugin())   // local Java Streams engine
        .withPlugin(Spark.basicPlugin()); // Apache Spark engine

JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
        .withJobName("MyJob")
        .withUdfJarOf(MyApp.class);       // ship this class's JAR to remote engines
JavaPlanBuilder constructor overloads:
ConstructorDescription
new JavaPlanBuilder(WayangContext ctx)Creates a builder with no pre-set job name.
new JavaPlanBuilder(WayangContext ctx, String jobName)Creates a builder with a job name for logging and tracking.

Builder configuration methods

These methods configure the plan and return this for chaining.
withJobName(String jobName)
JavaPlanBuilder
Sets a human-readable name for the job. Appears in logs and the cost model experiment output.
withUdfJar(String path)
JavaPlanBuilder
Registers a JAR file that will be shipped to remote execution engines (e.g., Spark workers). Pass the path to your application JAR.
withUdfJarOf(Class<?> cls)
JavaPlanBuilder
Convenience overload — resolves the JAR file that contains the given class and registers it automatically.

Source operators

Source methods on JavaPlanBuilder begin a pipeline by creating a DataQuantaBuilder from an external dataset.

readTextFile

planBuilder.readTextFile(String url)
Reads a text file line by line and produces a DataQuantaBuilder<String>. The URL must be a fully qualified URI such as file:///path/to/file.txt or hdfs://host/path.
planBuilder.readTextFile("file:///data/input.txt")
           .withName("Read input");

readParquet

planBuilder.readParquet(String url, String[] projection)
planBuilder.readParquet(String url, String[] projection, boolean preferDataset)
Reads a Parquet file and returns DataQuantaBuilder<Record>. Pass null for projection to read all columns. Set preferDataset = true to keep the channel backed by a Spark Dataset for downstream Dataset-aware operations.
planBuilder.readParquet("hdfs:///warehouse/sales.parquet", new String[]{"id", "amount"});

readApacheIcebergTable

planBuilder.readApacheIcebergTable(Catalog catalog, TableIdentifier tableIdentifier,
                                   Expression[] filterExpressions, String[] projectionColumns)
Reads an Apache Iceberg table via a Catalog instance and returns DataQuantaBuilder<Record>. Both filterExpressions and projectionColumns may be null.

readTable

planBuilder.readTable(TableSource source)
Reads rows from a JDBC-compatible TableSource and returns a RecordDataQuantaBuilder.

loadCollection

planBuilder.loadCollection(java.util.Collection<T> collection)
Wraps an in-memory Collection as a DataQuantaBuilder<T>. Useful for small lookup tables and test data.

Cloud storage sources

planBuilder.readGoogleCloudStorageFile(String bucket, String blobName, String credentialsPath)
planBuilder.readAmazonS3File(String bucket, String blobName, String credentialsPath)
planBuilder.readAzureBlobStorageFile(String container, String blobName, String credentialsPath)
planBuilder.readKafkaTopic(String topicName)
Each returns a DataQuantaBuilder<String> (one element per line / message).

DataQuanta transformation methods

After a source call you have a DataQuantaBuilder<T>. Every method below transforms the stream and returns a new DataQuantaBuilder unless the method is a sink (triggers execution).

map / mapJava

Transform each element one-to-one.
// Java lambda (preferred from Java code)
.map(word -> word.toLowerCase())

// Explicit SerializableFunction
.mapJava((SerializableFunction<String, String>) String::toLowerCase)
udf
Out → NewOut
Transformation function applied to every element.
udfLoad
LoadProfileEstimator (optional)
Cost estimator for the UDF. Omit in most cases; the default heuristic works well.

flatMap / flatMapJava

Transform each element into zero or more output elements.
.flatMap(line -> Arrays.asList(line.split("\\W+")))
 .withSelectivity(10, 100, 0.9)   // min, max, confidence
udf
Out → Iterable[NewOut]
Flattening function.
selectivity
ProbabilisticDoubleInterval (optional)
Hints to the optimizer how many outputs each input produces on average.

filter / filterJava

Keep only elements matching a predicate.
.filter(token -> !token.isEmpty())
 .withSelectivity(0.99, 0.99, 0.99)
udf
Out → Boolean
Predicate; elements where it returns true are kept.
sqlUdf
String (optional)
SQL WHERE clause equivalent, used when the operator is pushed down to a database.

reduceByKey

Group elements by a key and aggregate within each group.
.reduceByKey(
    Tuple2::getField0,
    (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
)
keyUdf
Out → Key
Extracts the grouping key.
udf
(Out, Out) → Out
Binary aggregation function applied within each group.

reduce / globalReduce

Aggregate all elements in the dataset into a single value.
.reduce((a, b) -> a + b)

groupByKey / materializedGroupBy

Group elements by a key and collect each group into an Iterable.
.groupByKey(word -> word.charAt(0))
// returns DataQuantaBuilder<Iterable<String>>

count

Count the total number of elements; returns DataQuantaBuilder<Long>.
.count()

sort

Sort elements by a key extractor.
.sort(Tuple2::getField1)

distinct

Remove duplicate elements.
.distinct()

union

Merge two datasets of the same type.
DataQuantaBuilder<String> left  = planBuilder.readTextFile("file:///a.txt");
DataQuantaBuilder<String> right = planBuilder.readTextFile("file:///b.txt");
left.union(right)

intersect

Keep only elements present in both datasets.
left.intersect(right)

join / joinJava

Equi-join two datasets on extracted keys. The result type is Tuple2<Left, Right>.
users.join(
    u -> u.getId(),     // key from left
    orders,
    o -> o.getUserId()  // key from right
)

coGroup

Like join, but produces pairs of Iterable groups instead of flat pairs.
users.coGroup(u -> u.getId(), orders, o -> o.getUserId())
// DataQuantaBuilder<Tuple2<Iterable<User>, Iterable<Order>>>

cartesian

Compute the Cartesian product of two datasets.
left.cartesian(right)

sample

Sample a fixed number of elements (optionally with a seed).
.sample(1000)
.sample(1000, SampleOperator.UNKNOWN_DATASET_SIZE, Optional.of(42L), SampleOperator.Methods.RANDOM)

doWhile / doWhileJava

Execute a loop body until a convergence condition holds.
initialData.doWhileJava(
    convergenceSet -> convergenceSet.size() < threshold,
    loopInput -> {
        DataQuantaBuilder<T> next = loopInput.map(...);
        DataQuantaBuilder<T> conv = next.filter(...);
        return new WayangTuple<>(next, conv);
    },
    20  // numExpectedIterations
)

repeat / repeatJava

Execute a loop body a fixed number of times.
initialCentroids.repeat(100, current ->
    current.map(reassignPoints)
           .reduceByKey(TaggedPoint::getCluster, TaggedPoint::sum)
           .map(TaggedPoint::average)
)

Sink operators (trigger execution)

Calling a sink method compiles the logical plan, runs the optimizer, and executes the job.

collect

Collect all output elements into a Java Collection on the driver.
Collection<Tuple2<String, Integer>> results = planBuilder
        .readTextFile("file:///input.txt")
        .flatMap(line -> Arrays.asList(line.split("\\W+")))
        .map(w -> new Tuple2<>(w, 1))
        .reduceByKey(Tuple2::getField0, (a, b) -> new Tuple2<>(a.getField0(), a.getField1() + b.getField1()))
        .collect();

writeTextFile

Write each element, formatted by a UDF, to a text file. The jobName parameter is required on DataQuantaBuilder to label the execution job.
.writeTextFile("file:///output.txt", t -> t.getField0() + "\t" + t.getField1(), "WriteResults")

foreach / foreachJava

Apply a side-effecting action to each element.
.foreach(System.out::println)

DataQuantaBuilder annotation methods

Every DataQuantaBuilder exposes the following fluent annotations. These do not change the data flow — they provide information to the optimizer and logging infrastructure.
MethodPurpose
.withName(String name)Labels the operator in logs and plan visualizations.
.withSelectivity(double lowerBound, double upperBound, double confidence)Hints to the optimizer how much data this operator produces relative to its input.
.withCardinalityEstimator(CardinalityEstimator estimator)Plugs in a custom cardinality estimator for more accurate cost modeling.
.withTargetPlatform(Platform platform)Restricts execution of this operator to a specific platform.
.withUdfJarOf(Class<?> cls)Ensures the JAR containing cls is shipped to remote engines.
.withBroadcast(DataQuantaBuilder<?> sender, String name)Makes the sender dataset available as a broadcast variable inside this operator’s UDF.

Complete Java WordCount example

The WordCount pipeline is the canonical demonstration of Wayang’s API. It runs unchanged on local Java Streams, Apache Spark, or both simultaneously — you only change which plugins you register.
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.util.Arrays;
import java.util.Collection;

public class WordCountJava {

    public static void main(String[] args) {

        String inputUrl = "file:///tmp/words.txt";

        // Register execution platforms.
        WayangContext wayangContext = new WayangContext(new Configuration())
                .withPlugin(Java.basicPlugin())
                .withPlugin(Spark.basicPlugin());

        JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
                .withJobName(String.format("WordCount (%s)", inputUrl))
                .withUdfJarOf(WordCountJava.class);

        // Build and execute the plan.
        Collection<Tuple2<String, Integer>> wordcounts = planBuilder
                // 1. Read the text file — one String per line.
                .readTextFile(inputUrl).withName("Load file")

                // 2. Split each line into words.
                .flatMap(line -> Arrays.asList(line.split("\\W+")))
                .withSelectivity(10, 100, 0.9)
                .withName("Split words")

                // 3. Drop empty tokens produced by leading/trailing punctuation.
                .filter(token -> !token.isEmpty())
                .withSelectivity(0.99, 0.99, 0.99)
                .withName("Filter empty words")

                // 4. Pair each word with count 1.
                .map(word -> new Tuple2<>(word.toLowerCase(), 1))
                .withName("To lower case, add counter")

                // 5. Sum counts per word.
                .reduceByKey(
                        Tuple2::getField0,
                        (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1())
                )
                .withCardinalityEstimator(
                        new DefaultCardinalityEstimator(0.9, 1, false, in -> Math.round(0.01 * in[0]))
                )
                .withName("Sum counts")

                // 6. Collect results back to the driver.
                .collect();

        wordcounts.forEach(System.out::println);
    }
}
The same pipeline runs on Spark by adding .withPlugin(Spark.basicPlugin()). Register both and the cost-based optimizer will split the job across engines automatically.

Broadcast variables

Broadcast variables let you ship a small reference dataset to every executor so that UDFs can read from it without a join.
Collection<String> stopWords = Arrays.asList("the", "a", "an");

DataQuantaBuilder<String> stopWordsDQ = planBuilder
        .loadCollection(stopWords)
        .withName("Stop words");

planBuilder
    .readTextFile("file:///corpus.txt")
    .flatMap(line -> Arrays.asList(line.split("\\W+")))
    .filter(word -> true) // UDF accesses the broadcast inside an ExtendedSerializableFunction
    .withBroadcast(stopWordsDQ, "stopWords")
    .collect();
Inside an ExtendedSerializableFunction call executionContext.getBroadcast("stopWords") in the open() method to retrieve the broadcasted collection.

Writing results

.writeTextFile(
    "file:///output/results.txt",
    tuple -> tuple.getField0() + "\t" + tuple.getField1(),
    "WriteResults"
);

Build docs developers (and LLMs) love