Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt
Use this file to discover all available pages before exploring further.
Apache Wayang separates the logical description of a computation from its physical execution on a specific platform. When you add a new operator, you define it at both levels and then wire them together with a Mapping. This page walks through every step using the built-in MapOperator and FilterOperator as reference patterns.
Overview: the four pieces
Every new operator family requires four things:
- A logical operator — describes what the computation does, platform-agnostically.
- One or more execution operators — platform-specific classes that perform the actual computation (e.g., Spark, Java).
- A
Mapping — tells Wayang’s optimizer how to replace the logical operator with an execution alternative.
- A
Plugin — bundles mappings, channel conversions, and platform registrations so WayangContext.withPlugin(...) can load them.
Step 1 — Define the logical operator
Logical operators live in wayang-commons/wayang-basic. They extend the appropriate base class depending on their arity:
| Base class | Use case |
|---|
UnaryToUnaryOperator<In, Out> | One input, one output (e.g., map, filter) |
BinaryToUnaryOperator<In0, In1, Out> | Two inputs, one output (e.g., join) |
UnarySource<Out> | No input, one output (e.g., file source) |
UnarySink<In> | One input, no output (e.g., file sink) |
package com.example.wayang.operators;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
import org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator;
import org.apache.wayang.core.plan.wayangplan.UnaryToUnaryOperator;
import org.apache.wayang.core.types.DataSetType;
import java.util.Optional;
/**
* A logical "double-map" operator that applies two functions in sequence.
* Logical operators are platform-agnostic; they carry only the computation
* descriptor and cardinality estimation logic.
*/
public class DoubleMapOperator<InputType, MidType, OutputType>
extends UnaryToUnaryOperator<InputType, OutputType> {
private final TransformationDescriptor<InputType, MidType> firstDescriptor;
private final TransformationDescriptor<MidType, OutputType> secondDescriptor;
public DoubleMapOperator(
TransformationDescriptor<InputType, MidType> first,
TransformationDescriptor<MidType, OutputType> second,
DataSetType<InputType> inputType,
DataSetType<OutputType> outputType) {
super(inputType, outputType, true);
this.firstDescriptor = first;
this.secondDescriptor = second;
}
public TransformationDescriptor<InputType, MidType> getFirstDescriptor() {
return firstDescriptor;
}
public TransformationDescriptor<MidType, OutputType> getSecondDescriptor() {
return secondDescriptor;
}
@Override
public Optional<CardinalityEstimator> createCardinalityEstimator(
int outputIndex, Configuration configuration) {
// 1:1 cardinality — output row count equals input row count
return Optional.of(new DefaultCardinalityEstimator(
1d, 1, this.isSupportingBroadcastInputs(),
inputCards -> inputCards[0]));
}
}
Study MapOperator (wayang-commons/wayang-basic/.../operators/MapOperator.java) as the canonical reference. It shows how to wrap a TransformationDescriptor, copy-construct, and wire a cardinality estimator.
Step 2 — Implement an execution operator
Execution operators live inside a platform module (e.g., wayang-platforms/wayang-spark). They extend the logical operator and implement the platform-specific interface:
package com.example.wayang.spark.operators;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.optimizer.OptimizationContext;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
import org.apache.wayang.core.platform.ChannelDescriptor;
import org.apache.wayang.core.platform.ChannelInstance;
import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.spark.channels.RddChannel;
import org.apache.wayang.spark.execution.SparkExecutor;
import org.apache.wayang.spark.operators.SparkExecutionOperator;
import com.example.wayang.operators.DoubleMapOperator;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class SparkDoubleMapOperator<InputType, MidType, OutputType>
extends DoubleMapOperator<InputType, MidType, OutputType>
implements SparkExecutionOperator {
public SparkDoubleMapOperator(DoubleMapOperator<InputType, MidType, OutputType> that) {
super(that.getFirstDescriptor(), that.getSecondDescriptor(),
that.getInputType(), that.getOutputType());
}
@Override
public Tuple<java.util.Collection<ExecutionLineageNode>,
java.util.Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
SparkExecutor sparkExecutor,
OptimizationContext.OperatorContext operatorContext) {
RddChannel.Instance input = (RddChannel.Instance) inputs[0];
RddChannel.Instance output = (RddChannel.Instance) outputs[0];
Function<InputType, MidType> f1 =
sparkExecutor.getCompiler().compile(this.getFirstDescriptor(),
this, operatorContext, inputs);
Function<MidType, OutputType> f2 =
sparkExecutor.getCompiler().compile(this.getSecondDescriptor(),
this, operatorContext, inputs);
JavaRDD<OutputType> result = input.<InputType>provideRdd()
.map(f1)
.map(f2);
this.name(result);
output.accept(result, sparkExecutor);
return ExecutionOperator.modelLazyExecution(inputs, outputs, operatorContext);
}
@Override
public String getLoadProfileEstimatorConfigurationKey() {
return "com.example.spark.doublemap.load";
}
@Override
public Optional<LoadProfileEstimator> createLoadProfileEstimator(
Configuration configuration) {
return Optional.ofNullable(
LoadProfileEstimators.createFromSpecification(
this.getLoadProfileEstimatorConfigurationKey(), configuration));
}
@Override
public List<ChannelDescriptor> getSupportedInputChannels(int index) {
return Arrays.asList(RddChannel.UNCACHED_DESCRIPTOR,
RddChannel.CACHED_DESCRIPTOR);
}
@Override
public List<ChannelDescriptor> getSupportedOutputChannels(int index) {
return Collections.singletonList(RddChannel.UNCACHED_DESCRIPTOR);
}
@Override
public boolean containsAction() { return false; }
@Override
protected ExecutionOperator createCopy() {
return new SparkDoubleMapOperator<>(this);
}
}
Channel types reference
| Channel class | Module | When to use |
|---|
RddChannel | wayang-spark | Default Spark channel; carries a JavaRDD<T>. |
DatasetChannel | wayang-spark | Schema-aware Spark Dataset<Row>. Use for Parquet/lakehouse pipelines. |
StreamChannel | wayang-java | Java Stream<T> channel for the Java platform. |
CollectionChannel | wayang-basic | An in-memory Collection<T>, platform-neutral. |
Step 3 — Create a Mapping
A Mapping implements org.apache.wayang.core.mapping.Mapping and returns a list of PlanTransformation objects. Each PlanTransformation pairs a SubplanPattern (what to match) with a ReplacementSubplanFactory (what to create).
The FilterMapping from the Spark platform is the clearest real-world example:
package com.example.wayang.spark.mapping;
import org.apache.wayang.core.mapping.Mapping;
import org.apache.wayang.core.mapping.OperatorPattern;
import org.apache.wayang.core.mapping.PlanTransformation;
import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
import org.apache.wayang.core.mapping.SubplanPattern;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.spark.platform.SparkPlatform;
import com.example.wayang.operators.DoubleMapOperator;
import com.example.wayang.spark.operators.SparkDoubleMapOperator;
import java.util.Collection;
import java.util.Collections;
/**
* Maps logical {@link DoubleMapOperator} to {@link SparkDoubleMapOperator}.
*/
@SuppressWarnings("unchecked")
public class DoubleMapMapping implements Mapping {
@Override
public Collection<PlanTransformation> getTransformations() {
return Collections.singleton(new PlanTransformation(
this.createSubplanPattern(),
this.createReplacementSubplanFactory(),
SparkPlatform.getInstance() // restrict to Spark
));
}
private SubplanPattern createSubplanPattern() {
// Match any DoubleMapOperator node in the plan
final OperatorPattern pattern = new OperatorPattern(
"doubleMap",
new DoubleMapOperator<>(null, null,
DataSetType.none(), DataSetType.none()),
false
);
return SubplanPattern.createSingleton(pattern);
}
private ReplacementSubplanFactory createReplacementSubplanFactory() {
return new ReplacementSubplanFactory.OfSingleOperators<DoubleMapOperator>(
(matched, epoch) -> new SparkDoubleMapOperator<>(matched).at(epoch)
);
}
}
PlanTransformation carries targetPlatforms so the optimizer knows which platform the replacement targets. Passing SparkPlatform.getInstance() ensures the mapping is only applied when the Spark platform is active in the current context.
Internally, PlanTransformation.transform(WayangPlan, epoch) does three things:
- Runs
SubplanPattern.match(plan, epoch-1) to collect all matching subgraphs.
- Wraps each match in an
OperatorAlternative, creating a new choice in the plan graph without removing the original.
- The optimizer then picks the lowest-cost alternative during plan enumeration.
Step 4 — Bundle in a Plugin and register
A Plugin groups platforms, mappings, and channel conversions so they can be loaded with a single withPlugin(...) call.
package com.example.wayang.spark;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.mapping.Mapping;
import org.apache.wayang.core.optimizer.channels.ChannelConversion;
import org.apache.wayang.core.platform.Platform;
import org.apache.wayang.core.plugin.Plugin;
import org.apache.wayang.spark.platform.SparkPlatform;
import com.example.wayang.spark.mapping.DoubleMapMapping;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
public class ExampleSparkPlugin implements Plugin {
@Override
public Collection<Platform> getRequiredPlatforms() {
return Collections.singletonList(SparkPlatform.getInstance());
}
@Override
public Collection<Mapping> getMappings() {
return Arrays.asList(new DoubleMapMapping());
}
@Override
public Collection<ChannelConversion> getChannelConversions() {
return Collections.emptyList();
}
@Override
public void setProperties(Configuration configuration) {
// Provide default property values here if needed
configuration.setProperty("com.example.spark.doublemap.load", "...");
}
}
Register the plugin when building your WayangContext:
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.spark.Spark;
WayangContext wayang = new WayangContext()
.withPlugin(Spark.basicPlugin())
.withPlugin(new ExampleSparkPlugin());
Build and test
Compile only the module you changed for faster iteration:
./mvnw clean install -DskipTests -pl wayang-platforms/wayang-spark
Before opening a pull request, verify the full build and tests pass:
Package a distribution to run your operator via wayang-submit:
./mvnw clean package -pl :wayang-assembly -Pdistribution
cd wayang-assembly/target/
tar -xvf apache-wayang-assembly-*-dist.tar.gz
cd wayang-*/
./bin/wayang-submit com.example.MyJob my-arg
Always run ./mvnw clean install (without -DskipTests) before submitting a PR. Skipping tests locally is fine for rapid iteration but can hide serialization and cardinality estimator regressions.
Reference: existing operators as patterns
| Logical operator | Execution operator | Mapping |
|---|
MapOperator | SparkMapOperator | wayang-spark/.../mapping/MapMapping.java |
FilterOperator | SparkFilterOperator | wayang-spark/.../mapping/FilterMapping.java |
ParquetSource | SparkParquetSource | wayang-spark/.../mapping/ParquetSourceMapping.java |
CollectionSource | SparkCollectionSource | wayang-spark/.../mapping/CollectionSourceMapping.java |
Start by copying FilterMapping — it is the shortest complete example of the pattern-match-and-replace lifecycle.