Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

Wayang’s Spark backend can run entire pipelines on Spark Dataset[Row] — also known as DataFrames. Use this mode when you ingest data from lakehouse formats such as Parquet or Delta, interoperate with Spark ML stages, or prefer schema-aware processing over raw RDDs. This guide explains how to opt in, how mixing Dataset and RDD operators works, and what to watch for in practice.

When to use Dataset channels

Lakehouse storage

Reading Parquet or Delta directly into datasets avoids repeated schema inference and keeps Spark’s optimized Parquet reader in play.

Spark ML

ML operators already convert RDDs into DataFrames internally. Feeding them a dataset channel skips that conversion and preserves column names.

Federated pipelines

Mix dataset-backed Spark stages with other platforms. Wayang inserts conversions only where strictly necessary.

How preferDataset works

The ParquetSource logical operator exposes a preferDatasetOutput(boolean) builder method. When set to true, the Spark execution operator (SparkParquetSource) will advertise DatasetChannel.UNCACHED_DESCRIPTOR as its preferred output channel and hand a Dataset<Row> directly to downstream operators instead of converting to an RDD.
// Java API — read Parquet into a DatasetChannel
ParquetSource source = new ParquetSource("hdfs:///data/sales.parquet",
    new String[]{"customer_id", "amount"});
source.preferDatasetOutput(true);   // opt in to Dataset channel
Similarly, ParquetSink accepts a preferDataset flag in its constructor:
// Write back to Parquet without converting to RDD first
new ParquetSink("hdfs:///data/out/", /* isOverwrite */ true, /* preferDataset */ true);

Full example pipeline

import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.operators.ParquetSource;
import org.apache.wayang.basic.operators.ParquetSink;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.spark.Spark;

WayangContext wayang = new WayangContext(new Configuration())
    .withPlugin(Spark.basicPlugin());

JavaPlanBuilder plan = new JavaPlanBuilder(wayang);

// Source: Dataset channel preferred
ParquetSource source = new ParquetSource(
    "hdfs:///warehouse/events.parquet", null);
source.preferDatasetOutput(true);

// Sink: stays in Dataset channel
ParquetSink sink = new ParquetSink(
    "hdfs:///warehouse/events_out.parquet", true, true);

plan.readParquet(source)
    .filter(record -> (Long) record.getField(2) > 100L)
    .writeTo(sink);

plan.buildAndExecute();

Mixing with RDD operators

If a downstream operator only supports RddChannel, Wayang inserts a conversion operator automatically. Two conversion operators are available:

SparkRddToDatasetOperator

Converts a JavaRDD<Record> into a Dataset<Row>. The conversion uses DatasetConverters.recordsToDataset(...), which infers the schema from the RecordType attached to the data type, or falls back to sampled schema inference.
Input:  RddChannel (JavaRDD<Record>)
Output: DatasetChannel (Dataset<Row>)
Load profile key: wayang.spark.rdd-to-dataset.load

SparkDatasetToRddOperator

Converts a Dataset<Row> back into a JavaRDD<Record>.
Input:  DatasetChannel (Dataset<Row>)
Output: RddChannel (JavaRDD<Record>)
Both conversions carry non-trivial load profiles and will appear in plan explanations when you mix channel types. You can inspect them by enabling plan explanation:
config.setProperty("wayang.core.explain.enabled", "true");
config.setProperty("wayang.core.explain.directrory", "/tmp/wayang-plans/");

RecordType for schema preservation

RecordType carries field names so that SparkRddToDatasetOperator can produce a precise schema without sampling the RDD. Provide field names when constructing your sources:
import org.apache.wayang.basic.types.RecordType;
import org.apache.wayang.core.types.DataSetType;

// Declare field names for a typed Record
RecordType schema = new RecordType("customer_id", "product_id", "amount", "ts");
DataSetType<Record> typedSource =
    DataSetType.createDefault(schema);

// Pass as the output type when constructing ParquetSource
ParquetSource source = new ParquetSource(
    "hdfs:///data/orders.parquet",
    new String[]{"customer_id", "amount"},   // projection
    schema.getFieldNames());
When a RecordType is present, the RDD-to-Dataset conversion maps each field by name rather than by column position, which avoids silent schema mismatches when parquet files evolve.

Developer checklist

1

Use RecordType whenever possible

Attach field names to your logical operator output types. This lets the RDD↔Dataset converters produce a precise schema and avoids sampling overhead.
2

Reuse sparkExecutor.ss

When writing custom Spark execution operators that build DataFrames, always obtain the SparkSession from sparkExecutor.ss rather than calling SparkSession.builder(). Creating extra contexts causes failures unless spark.driver.allowMultipleContexts is true.
3

Inspect plan explanations

Enable wayang.core.explain.enabled = true and review the generated plan files to verify that dataset-to-RDD conversions are not inserted unexpectedly. Each conversion adds serialization overhead.
4

Advertise DatasetChannel in custom operators

If you write a custom execution operator that consumes or produces a Dataset<Row>, declare DatasetChannel.UNCACHED_DESCRIPTOR (and optionally DatasetChannel.CACHED_DESCRIPTOR) in getSupportedInputChannels / getSupportedOutputChannels. Otherwise the optimizer cannot keep data in dataset form through your operator.
5

Prefer dataset-aware operator chains

Once a plan starts with a dataset channel, the optimizer assigns a cost to any Dataset↔RDD conversion it must insert. Keeping all operators dataset-aware removes that cost entirely and lets Spark’s Catalyst optimizer inline the whole pipeline.

Current limitations

The following limitations apply to the current release. Contributions to address them are welcome.
  • Parquet only. Only ParquetSource and ParquetSink expose the preferDataset API today. Text sources (TextFileSource) and object sources still produce RddChannel output.
  • ML4All pipelines. ML4All stages currently emit plain double[] / Double RDDs. They benefit from the internal DataFrame conversions within ML operators but do not expose DatasetChannel outputs yet.
  • No dataset-aware map/filter. The standard MapOperator and FilterOperator Spark implementations use RddChannel. A dataset-aware variant would allow schema-preserving transforms without leaving Catalyst’s purview.
  • No Delta Lake source. There is no built-in Delta source yet; Delta tables can be read as Parquet using SparkParquetSource but without Delta transaction log awareness.

Spark channel type summary

ChannelClassCarriesNotes
Uncached RDDRddChannel.UNCACHED_DESCRIPTORJavaRDD<T>Default Spark channel
Cached RDDRddChannel.CACHED_DESCRIPTORPersisted JavaRDD<T>Used when data is reused
Uncached DatasetDatasetChannel.UNCACHED_DESCRIPTORDataset<Row>Preferred for Parquet/lakehouse
Cached DatasetDatasetChannel.CACHED_DESCRIPTORPersisted Dataset<Row>For multi-consumer dataset plans
BroadcastBroadcastChannel.DESCRIPTORBroadcast variableFor small side-inputs

Build docs developers (and LLMs) love