Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

The Apache Spark platform adapter lets Wayang route operators to a Spark cluster (or a local Spark context) without changing a single line of your pipeline code. You register Spark.basicPlugin() alongside any other platforms you have, and Wayang’s cost-based optimizer decides which operators run on Spark and which run elsewhere. Spark’s strength is large-scale batch processing: its in-memory RDD model and cluster scheduler amortize startup overhead over large datasets in a way that local engines cannot match.

Requirements

Before registering the Spark platform, ensure the following are in place:
  • Apache Spark 3.4.4 with Scala 2.12 installed and SPARK_HOME set to the installation directory.
  • Apache Hadoop 3+ with HADOOP_HOME set (required for HDFS and distributed file I/O).
  • Java 17 with JAVA_HOME set. Running on Java 17 requires passing extra JVM flags to open internal modules — see the installation guide for the exact flags.
Wayang is compiled against Spark 3.4.4 / Scala 2.12. Using a different Spark major version or a Scala 2.13 build will cause binary incompatibility errors at runtime.

Maven dependency

<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-spark</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>
Replace WAYANG_VERSION with the latest release on Maven Central.

Entry-point class: Spark

The Spark class in org.apache.wayang.spark exposes four static factory methods:
import org.apache.wayang.spark.Spark;

// Standard batch operators: map, filter, reduce, join, sort, …
Spark.basicPlugin()

// Graph operators: PageRank
Spark.graphPlugin()

// Channel-conversion rules (RDD ↔ Java Collection, RDD ↔ Dataset, …)
Spark.conversionPlugin()

// Spark ML operators: KMeans, LinearRegression, LogisticRegression, …
Spark.mlPlugin()
basicPlugin() and graphPlugin() are independent — register only the ones you need. conversionPlugin() is included automatically when you register basicPlugin(), but you can add it explicitly if you need fine-grained control over cross-platform transfers.

Supported operators

Spark.basicPlugin() covers all standard Wayang operators on Spark:
  • Sources: TextFileSource, ObjectFileSource, CollectionSource, ParquetSource, KafkaTopicSource
  • Transformations: MapOperator, MapPartitionsOperator, FlatMapOperator, FilterOperator, SortOperator, DistinctOperator, ZipWithIdOperator
  • Aggregations: ReduceByOperator, GlobalReduceOperator, MaterializedGroupByOperator, GlobalMaterializedGroupOperator, CountOperator
  • Multi-input: JoinOperator, CartesianOperator, CoGroupOperator, UnionAllOperator, IntersectOperator
  • Control flow: LoopOperator, DoWhileOperator, RepeatOperator, SampleOperator
  • Sinks: TextFileSink, ObjectFileSink, LocalCallbackSink, KafkaTopicSink, ParquetSink, TableSink
Spark.graphPlugin() adds PageRankOperator. Spark.mlPlugin() adds KMeans, LinearRegression, LogisticRegression, DecisionTreeClassification, DecisionTreeRegression, LinearSVC, ModelTransform, and PredictOperator.

Configuration

wayang-spark-defaults.properties (bundled inside wayang-spark) sets these defaults:
# Spark context settings
spark.master = local[1]
spark.app.name = Wayang App
spark.ui.showConsoleProgress = false
spark.driver.allowMultipleContexts = true

# Cost-model parameters used by the optimizer
wayang.spark.cpu.mhz = 2700
wayang.spark.machines = 1
wayang.spark.cores-per-machine = 2
wayang.spark.hdfs.ms-per-mb = 2.7
wayang.spark.network.ms-per-mb = 8.6
wayang.spark.init.ms = 4500
wayang.spark.stretch = 1
wayang.spark.costs.fix = 0.0
wayang.spark.costs.per-ms = 1.0
wayang.spark.init.ms = 4500 is the estimated startup cost (in milliseconds) the optimizer charges for any plan that uses Spark. This is the primary reason the optimizer avoids Spark for tiny datasets — the 4.5-second fixed cost only pays off when the data volume is large enough to justify distributed execution.

RDD channels vs Dataset channels

By default, Wayang’s Spark backend represents data as Java RDDs (JavaRDD<T>). As of recent releases, Spark Dataset/DataFrame channels are also available for operators that benefit from Spark’s Catalyst optimizer (structured query optimization, columnar reads, Spark ML integration).
  • RDD channels are the default for general-purpose pipelines.
  • Dataset channels are preferable when reading Parquet/Delta, interoperating with Spark ML stages, or using schema-aware processing.
For full details on opting in to Dataset channels, including the readParquet(..., preferDataset = true) API and the automatic RDD ↔ Dataset conversion operators (SparkRddToDatasetOperator, SparkDatasetToRddOperator), see the Spark Dataset Pipelines guide.

Example: register only Spark (force Spark execution)

When you register only Spark, the optimizer has no other choice — every operator runs on Spark. This is useful for integration tests that must verify Spark behavior, or for pipelines that are known to be large and should never fall back to local execution.
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.spark.Spark;
import java.util.Arrays;

public class SparkWordCount {
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        // Use a standalone cluster; remove this line to run in local[*] mode.
        conf.setProperty("spark.master", "spark://master:7077");

        // Register ONLY Spark → every operator is forced to run on Spark.
        WayangContext wayang = new WayangContext(conf)
                .withPlugin(Spark.basicPlugin());

        new JavaPlanBuilder(wayang)
                .withJobName("SparkWordCount")
                .withUdfJarOf(SparkWordCount.class)
                .readTextFile("hdfs:///data/input/")
                .flatMap(line -> Arrays.asList(line.split("\\W+")))
                .filter(word -> !word.isEmpty())
                .map(word -> new Tuple2<>(word.toLowerCase(), 1))
                .reduceByKey(
                        Tuple2::getField0,
                        (a, b) -> new Tuple2<>(a.getField0(), a.getField1() + b.getField1()))
                .writeTextFile(
                        "hdfs:///data/output/",
                        t -> t.getField0() + ": " + t.getField1());
    }
}
Why register only Spark in tests? If you register both Java and Spark, the optimizer will keep small test datasets on Java Streams (correct behavior, but it means Spark is never exercised). Registering Spark alone forces every operator onto Spark so you can confirm connectivity, serialization, and UDF behavior.
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;

WayangContext wayang = new WayangContext(new Configuration())
        .withPlugin(Java.basicPlugin())
        .withPlugin(Spark.basicPlugin());
// The optimizer now picks the cheaper platform per operator.
// Small stages stay local; large shuffles and scans go to Spark.

Adding Spark ML operators

WayangContext wayang = new WayangContext(new Configuration())
        .withPlugin(Java.basicPlugin())
        .withPlugin(Spark.basicPlugin())
        .withPlugin(Spark.mlPlugin()); // adds KMeans, LinearRegression, etc.

Build docs developers (and LLMs) love