Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

One of Apache Wayang’s defining capabilities is that you don’t pick a platform — the optimizer does. Given a logical plan and a set of registered plugins, the optimizer assigns every operator to the platform where it will run cheapest, and it can even split a single job across multiple engines within the same execution. This page explains the mechanics: how data sizes are estimated, how costs are computed, how the search space is pruned, and how the final assignment is made.

The Cost Model at a Glance

The optimizer works in four phases:
  1. Cardinality estimation — estimate how many records flow through every edge in the plan.
  2. Load estimation — for each (operator, platform) pair, compute a LoadProfile (CPU, memory, network).
  3. Plan enumeration — expand the logical plan into candidate physical plans by substituting each logical operator with one or more execution operators.
  4. Pruning and selection — discard dominated candidates, squash uncertainty intervals to scalars, and pick the plan with the lowest total cost.
All cost values are represented as ProbabilisticDoubleInterval — an interval with an associated correctness probability — so the optimizer can reason about uncertainty rather than pretending estimates are exact.

Cardinality Estimation

Every logical operator that contributes a CardinalityEstimator tells the optimizer how its output cardinality relates to its input cardinalities. Estimates are represented as CardinalityEstimate(lowerBound, upperBound, correctnessProbability).
// CardinalityEstimator — the single method every estimator implements
@FunctionalInterface
public interface CardinalityEstimator extends Serializable {
    CardinalityEstimate estimate(
        OptimizationContext optimizationContext,
        CardinalityEstimate... inputEstimates
    );
}
How individual operators estimate cardinality:
  • MapOperator — output cardinality equals input cardinality (1:1 transformation). Its estimator uses DefaultCardinalityEstimator with a selectivity of 1.0.
  • FilterOperator — reads a ProbabilisticDoubleInterval selectivity from the Configuration for the given PredicateDescriptor. Output range is [lower × selectivityLower, upper × selectivityUpper] with combined correctness probability.
  • TextFileSource — samples the first 1 MiB of the file, measures average bytes per line, and extrapolates to the full file size with a correctness probability of 0.95.
  • FlatMapOperator, JoinOperator, GroupByOperator — each supplies its own estimator appropriate to the semantics.
Estimates propagate forward through the plan: source estimates flow into transformation estimates, which flow into aggregation estimates, and so on. Operators in loops are handled by the LoopContext, which manages per-iteration OptimizationContexts and can refine estimates as the loop progresses.
Wayang stores observed actual cardinalities in a CardinalityRepository. Over time, this data is used to improve future estimates for the same operator types and UDF signatures — a lightweight form of learned statistics.

ProbabilisticDoubleInterval: Uncertainty-Aware Costs

All cost quantities — selectivities, load profiles, time estimates, and final cost estimates — are represented as ProbabilisticDoubleInterval:
// An interval [lowerEstimate, upperEstimate] with a correctness probability in [0, 1].
ProbabilisticDoubleInterval interval = new ProbabilisticDoubleInterval(
    100_000,   // lower bound
    500_000,   // upper bound
    0.90       // 90 % probability the true value falls in this range
);

// Exact value (zero-width interval, probability = 1)
ProbabilisticDoubleInterval exact = ProbabilisticDoubleInterval.ofExactly(42.0);
When two intervals are combined (e.g., adding costs across two operators), the resulting interval’s bounds are added and the combined correctness probability is taken as the minimum of the two. This means a long chain of uncertain estimates naturally produces a wider interval with lower overall confidence — which is what you’d expect. The squasher (configured as wayang.core.optimizer.cost.squasher) collapses an interval to a single scalar for the final comparison. The default squasher is the geometric mean: sqrt(lower × upper).

Load Estimation: CPU, Memory, and Network

Once cardinality estimates are available, the optimizer estimates the actual resource consumption of each execution operator using a LoadProfileEstimator. A LoadProfile contains:
ResourceUnit
CPU loadoperations (scaled by frequency)
RAM loadbytes
Disk I/Obytes
Network I/Obytes
The LoadProfile is converted to a TimeEstimate (wall-clock time with a correctness interval) using the platform’s LoadProfileToTimeConverter, and then to a cost using the platform’s TimeToCostConverter. Different platforms have very different cost profiles:
  • Java Streams — minimal startup overhead; cost scales linearly with data size. Cheap for small data.
  • Apache Spark — significant startup overhead (JVM initialization, SparkContext creation, task scheduling). Worth it only when data is large enough that the parallel speedup outweighs the startup cost.
  • Apache Flink — similar structure to Spark; optimized for streaming but also effective for large batch workloads.
  • PostgreSQL / SQLite — near-zero startup; highly efficient for filter-heavy workloads that fit in a database’s query optimizer.
For small datasets the optimizer almost always picks the local Java engine because Spark’s startup cost dominates. This is the optimizer working correctly, not ignoring Spark. Register both plugins in production: the optimizer will route large operators to Spark automatically once the data grows.

Plan Enumeration

The plan enumerator expands every logical operator into the set of execution operators provided by registered plugins. For a plan with n operators, each with k platform options, the search space is k^n candidate plans. Two strategies keep this tractable:
  1. Operator-by-operator expansion — the enumerator builds partial plans left-to-right, pruning dominated partial assignments before expanding the next operator.
  2. Pruning strategies (PlanEnumerationPruningStrategy) — pluggable strategies discard any partial plan whose lower-bound cost already exceeds the best known complete plan. The default strategy is LatentOperatorPruningStrategy.
Channel conversion costs are added whenever two adjacent operators in a candidate plan are assigned to different platforms. This naturally penalizes plans that bounce data between engines too frequently.

How the Optimizer Chooses Platforms

The optimizer assigns a squashedCostEstimate to every candidate complete plan and selects the plan with the smallest value. The key comparison is:
totalCost(plan) = Σ  operatorCost(op, platform)
                 op    + channelConversionCost(source_platform → target_platform)
Each operatorCost is obtained by the OperatorContext.updateCostEstimate() path, which calls LoadProfileEstimators.estimateLoadProfile(), converts to time via LoadProfileToTimeConverter, and converts to cost via TimeToCostConverter.
// Registering both Java and Spark gives the optimizer a choice for every operator
WayangContext wayang = new WayangContext(new Configuration())
        .withPlugin(Java.basicPlugin())
        .withPlugin(Spark.basicPlugin());
With both plugins registered, the optimizer might decide:
  • Run TextFileSourceFilterOperator on Java (small data, filter is cheap locally).
  • Switch to Spark for a JoinOperator on two large datasets (parallelism pays off).
  • Use a cross-platform channel (temp file on HDFS) to hand data from Java to Spark.

Cross-Platform Execution

When the optimizer selects different platforms for different parts of the plan, the CrossPlatformExecutor orchestrates the hand-off. It groups execution tasks into ExecutionStage blocks: a stage boundary exists wherever two adjacent tasks belong to different platforms. At a stage boundary, Wayang inserts a ChannelConversion operator to serialize the output of the upstream stage into a format the downstream platform can read. Common conversions include:
  • Java Collection → HDFS text file → Spark RDD
  • Spark RDD → materialized file → Flink DataSet
  • Any platform → JDBC → PostgreSQL table
Channel conversion costs are pre-computed and available to the optimizer, so it only introduces a platform switch when the savings on the downstream operators more than compensate for the serialization overhead.

Adaptive Re-Optimization in Loops

When the plan contains a doWhile or repeat operator, Wayang can observe actual output cardinalities after each iteration and compare them with estimates. If the deviation exceeds a configured threshold (a CardinalityBreakpoint), the executor re-invokes the optimizer to produce a revised plan for subsequent iterations. This is especially valuable for iterative algorithms like PageRank or K-Means where convergence speed is data-dependent.

Customizing the Cost Model

For advanced use cases, you can replace the default cost model entirely by implementing EstimatableCost and registering it with the Configuration:
import org.apache.wayang.core.optimizer.costs.EstimatableCost;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

public class MyCustomCost implements EstimatableCost {
    // Implement getEstimate(), getParallelEstimate(), getSquashedEstimate(),
    // getSquashedParallelEstimate(), pickBestExecutionPlan(), getFactory(),
    // and getParallelOperatorJunctionAllCostEstimate() —
    // using any model, including an ML model loaded from disk.
}

Configuration config = new Configuration();
config.setCostModel(new MyCustomCost());

WayangContext wayang = new WayangContext(config)
        .withPlugin(Java.basicPlugin())
        .withPlugin(Spark.basicPlugin());
The wayang-ml module ships a reference implementation (OrtMLModel) that loads an ONNX model and uses a one-hot encoding of the plan graph to predict execution time — a drop-in replacement for the formula-based cost estimator.

Build docs developers (and LLMs) love