Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

Apache Wayang separates the logical description of a computation from its physical execution on a specific platform. When you add a new operator, you define it at both levels and then wire them together with a Mapping. This page walks through every step using the built-in MapOperator and FilterOperator as reference patterns.

Overview: the four pieces

Every new operator family requires four things:
  1. A logical operator — describes what the computation does, platform-agnostically.
  2. One or more execution operators — platform-specific classes that perform the actual computation (e.g., Spark, Java).
  3. A Mapping — tells Wayang’s optimizer how to replace the logical operator with an execution alternative.
  4. A Plugin — bundles mappings, channel conversions, and platform registrations so WayangContext.withPlugin(...) can load them.

Step 1 — Define the logical operator

Logical operators live in wayang-commons/wayang-basic. They extend the appropriate base class depending on their arity:
Base classUse case
UnaryToUnaryOperator<In, Out>One input, one output (e.g., map, filter)
BinaryToUnaryOperator<In0, In1, Out>Two inputs, one output (e.g., join)
UnarySource<Out>No input, one output (e.g., file source)
UnarySink<In>One input, no output (e.g., file sink)
package com.example.wayang.operators;

import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
import org.apache.wayang.core.types.DataSetType;

import java.util.Optional;

/**
 * A logical "double-map" operator that applies two functions in sequence.
 * Logical operators are platform-agnostic; they carry only the computation
 * descriptor and cardinality estimation logic.
 */
public class DoubleMapOperator<InputType, MidType, OutputType>
        extends UnaryToUnaryOperator<InputType, OutputType> {

    private final TransformationDescriptor<InputType, MidType> firstDescriptor;
    private final TransformationDescriptor<MidType, OutputType> secondDescriptor;

    public DoubleMapOperator(
            TransformationDescriptor<InputType, MidType> first,
            TransformationDescriptor<MidType, OutputType> second,
            DataSetType<InputType> inputType,
            DataSetType<OutputType> outputType) {
        super(inputType, outputType, true);
        this.firstDescriptor = first;
        this.secondDescriptor = second;
    }

    public TransformationDescriptor<InputType, MidType> getFirstDescriptor() {
        return firstDescriptor;
    }

    public TransformationDescriptor<MidType, OutputType> getSecondDescriptor() {
        return secondDescriptor;
    }

    @Override
    public Optional<CardinalityEstimator> createCardinalityEstimator(
            int outputIndex, Configuration configuration) {
        // 1:1 cardinality — output row count equals input row count
        return Optional.of(new DefaultCardinalityEstimator(
                1d, 1, this.isSupportingBroadcastInputs(),
                inputCards -> inputCards[0]));
    }
}
Study MapOperator (wayang-commons/wayang-basic/.../operators/MapOperator.java) as the canonical reference. It shows how to wrap a TransformationDescriptor, copy-construct, and wire a cardinality estimator.

Step 2 — Implement an execution operator

Execution operators live inside a platform module (e.g., wayang-platforms/wayang-spark). They extend the logical operator and implement the platform-specific interface:
package com.example.wayang.spark.operators;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.spark.channels.RddChannel;
import org.apache.wayang.spark.execution.SparkExecutor;
import org.apache.wayang.spark.operators.SparkExecutionOperator;
import com.example.wayang.operators.DoubleMapOperator;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

public class SparkDoubleMapOperator<InputType, MidType, OutputType>
        extends DoubleMapOperator<InputType, MidType, OutputType>
        implements SparkExecutionOperator {

    public SparkDoubleMapOperator(DoubleMapOperator<InputType, MidType, OutputType> that) {
        super(that.getFirstDescriptor(), that.getSecondDescriptor(),
              that.getInputType(), that.getOutputType());
    }

    @Override
    public Tuple<java.util.Collection<ExecutionLineageNode>,
                 java.util.Collection<ChannelInstance>> evaluate(
            ChannelInstance[] inputs,
            ChannelInstance[] outputs,
            SparkExecutor sparkExecutor,
            OptimizationContext.OperatorContext operatorContext) {

        RddChannel.Instance input  = (RddChannel.Instance) inputs[0];
        RddChannel.Instance output = (RddChannel.Instance) outputs[0];

        Function<InputType, MidType> f1 =
            sparkExecutor.getCompiler().compile(this.getFirstDescriptor(),
                                                this, operatorContext, inputs);
        Function<MidType, OutputType> f2 =
            sparkExecutor.getCompiler().compile(this.getSecondDescriptor(),
                                                this, operatorContext, inputs);

        JavaRDD<OutputType> result = input.<InputType>provideRdd()
            .map(f1)
            .map(f2);
        this.name(result);
        output.accept(result, sparkExecutor);

        return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
    }

    @Override
    public String getLoadProfileEstimatorConfigurationKey() {
        return "com.example.spark.doublemap.load";
    }

    @Override
    public Optional<LoadProfileEstimator> createLoadProfileEstimator(
            Configuration configuration) {
        return Optional.ofNullable(
            LoadProfileEstimators.createFromSpecification(
                this.getLoadProfileEstimatorConfigurationKey(), configuration));
    }

    @Override
    public List<ChannelDescriptor> getSupportedInputChannels(int index) {
        return Arrays.asList(RddChannel.UNCACHED_DESCRIPTOR,
                             RddChannel.CACHED_DESCRIPTOR);
    }

    @Override
    public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
        return Collections.singletonList(RddChannel.UNCACHED_DESCRIPTOR);
    }

    @Override
    public boolean containsAction() { return false; }

    @Override
    protected ExecutionOperator createCopy() {
        return new SparkDoubleMapOperator<>(this);
    }
}

Channel types reference

Channel classModuleWhen to use
RddChannelwayang-sparkDefault Spark channel; carries a JavaRDD<T>.
DatasetChannelwayang-sparkSchema-aware Spark Dataset<Row>. Use for Parquet/lakehouse pipelines.
StreamChannelwayang-javaJava Stream<T> channel for the Java platform.
CollectionChannelwayang-basicAn in-memory Collection<T>, platform-neutral.

Step 3 — Create a Mapping

A Mapping implements org.apache.wayang.core.mapping.Mapping and returns a list of PlanTransformation objects. Each PlanTransformation pairs a SubplanPattern (what to match) with a ReplacementSubplanFactory (what to create). The FilterMapping from the Spark platform is the clearest real-world example:
package com.example.wayang.spark.mapping;

import org.apache.wayang.core.mapping.Mapping;
import org.apache.wayang.core.mapping.OperatorPattern;
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
import org.apache.wayang.core.mapping.SubplanPattern;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.spark.platform.SparkPlatform;
import com.example.wayang.operators.DoubleMapOperator;
import com.example.wayang.spark.operators.SparkDoubleMapOperator;

import java.util.Collection;
import java.util.Collections;

/**
 * Maps logical {@link DoubleMapOperator} to {@link SparkDoubleMapOperator}.
 */
@SuppressWarnings("unchecked")
public class DoubleMapMapping implements Mapping {

    @Override
    public Collection<PlanTransformation> getTransformations() {
        return Collections.singleton(new PlanTransformation(
            this.createSubplanPattern(),
            this.createReplacementSubplanFactory(),
            SparkPlatform.getInstance()  // restrict to Spark
        ));
    }

    private SubplanPattern createSubplanPattern() {
        // Match any DoubleMapOperator node in the plan
        final OperatorPattern pattern = new OperatorPattern(
            "doubleMap",
            new DoubleMapOperator<>(null, null,
                DataSetType.none(), DataSetType.none()),
            false
        );
        return SubplanPattern.createSingleton(pattern);
    }

    private ReplacementSubplanFactory createReplacementSubplanFactory() {
        return new ReplacementSubplanFactory.OfSingleOperators<DoubleMapOperator>(
            (matched, epoch) -> new SparkDoubleMapOperator<>(matched).at(epoch)
        );
    }
}
PlanTransformation carries targetPlatforms so the optimizer knows which platform the replacement targets. Passing SparkPlatform.getInstance() ensures the mapping is only applied when the Spark platform is active in the current context.

How PlanTransformation works

Internally, PlanTransformation.transform(WayangPlan, epoch) does three things:
  1. Runs SubplanPattern.match(plan, epoch-1) to collect all matching subgraphs.
  2. Wraps each match in an OperatorAlternative, creating a new choice in the plan graph without removing the original.
  3. The optimizer then picks the lowest-cost alternative during plan enumeration.

Step 4 — Bundle in a Plugin and register

A Plugin groups platforms, mappings, and channel conversions so they can be loaded with a single withPlugin(...) call.
package com.example.wayang.spark;

import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.mapping.Mapping;
import org.apache.wayang.core.optimizer.channels.ChannelConversion;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.plugin.Plugin;
import org.apache.wayang.spark.platform.SparkPlatform;
import com.example.wayang.spark.mapping.DoubleMapMapping;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

public class ExampleSparkPlugin implements Plugin {

    @Override
    public Collection<Platform> getRequiredPlatforms() {
        return Collections.singletonList(SparkPlatform.getInstance());
    }

    @Override
    public Collection<Mapping> getMappings() {
        return Arrays.asList(new DoubleMapMapping());
    }

    @Override
    public Collection<ChannelConversion> getChannelConversions() {
        return Collections.emptyList();
    }

    @Override
    public void setProperties(Configuration configuration) {
        // Provide default property values here if needed
        configuration.setProperty("com.example.spark.doublemap.load", "...");
    }
}
Register the plugin when building your WayangContext:
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.spark.Spark;

WayangContext wayang = new WayangContext()
    .withPlugin(Spark.basicPlugin())
    .withPlugin(new ExampleSparkPlugin());

Build and test

Compile only the module you changed for faster iteration:
./mvnw clean install -DskipTests -pl wayang-platforms/wayang-spark
Before opening a pull request, verify the full build and tests pass:
./mvnw clean install
Package a distribution to run your operator via wayang-submit:
./mvnw clean package -pl :wayang-assembly -Pdistribution
cd wayang-assembly/target/
tar -xvf apache-wayang-assembly-*-dist.tar.gz
cd wayang-*/
./bin/wayang-submit com.example.MyJob my-arg
Always run ./mvnw clean install (without -DskipTests) before submitting a PR. Skipping tests locally is fine for rapid iteration but can hide serialization and cardinality estimator regressions.

Reference: existing operators as patterns

Logical operatorExecution operatorMapping
MapOperatorSparkMapOperatorwayang-spark/.../mapping/MapMapping.java
FilterOperatorSparkFilterOperatorwayang-spark/.../mapping/FilterMapping.java
ParquetSourceSparkParquetSourcewayang-spark/.../mapping/ParquetSourceMapping.java
CollectionSourceSparkCollectionSourcewayang-spark/.../mapping/CollectionSourceMapping.java
Start by copying FilterMapping — it is the shortest complete example of the pattern-match-and-replace lifecycle.

Build docs developers (and LLMs) love