Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

Apache Wayang’s Scala API is the most expressive way to build cross-platform data pipelines. PlanBuilder accepts Scala lambdas, tuples, and case classes natively — no adapter boilerplate required. Every transformation returns a DataQuanta[T], a typed reference to an intermediate dataset in the logical plan. The plan is only materialized when you call a sink such as collect() or writeTextFile().

Maven / SBT dependency

<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-api-scala-java_2.12</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-java</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-spark_2.12</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>

Creating WayangContext and PlanBuilder

import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark

val wayangContext = new WayangContext(new Configuration)
  .withPlugin(Java.basicPlugin)
  .withPlugin(Spark.basicPlugin)

val planBuilder = new PlanBuilder(wayangContext)
  .withJobName("MyScalaJob")
  .withUdfJarsOf(this.getClass)
PlanBuilder constructor:
class PlanBuilder(wayangContext: WayangContext, jobName: String = null)

Builder configuration methods

MethodReturnsDescription
withJobName(name: String)PlanBuilderLabels the job in cost model and logs.
withUdfJars(paths: String*)PlanBuilderRegisters JAR files for remote engines.
withUdfJarsOf(classes: Class[_]*)PlanBuilderResolves and registers JARs from class locations.
withExperiment(exp: Experiment)PlanBuilderAttaches a ProfileDB experiment for metrics collection.

Source operators

readTextFile

def readTextFile(url: String): DataQuanta[String]
Reads a text file line by line. URL must be fully qualified (file://, hdfs://, etc.).
val lines = planBuilder.readTextFile("file:///data/corpus.txt")

readObjectFile

def readObjectFile[T: ClassTag](url: String): DataQuanta[T]
Reads a serialized object file where every element is of type T.

readParquet

def readParquet(url: String,
                projection: Array[String] = null,
                preferDataset: Boolean = false): DataQuanta[Record]
Reads a Parquet file and returns DataQuanta[Record]. Pass a projection array to select only specific columns. Set preferDataset = true to keep the result in a Spark Dataset for downstream dataset-aware operations.

readApacheIcebergTable

def readApacheIcebergTable(
  catalog: Catalog,
  tableIdentifier: TableIdentifier,
  filterExpressions: Array[Expression] = null,
  projectionColumns: Array[String] = null): DataQuanta[Record]
Reads an Apache Iceberg table through an Iceberg Catalog instance.

readTable

def readTable(source: TableSource): DataQuanta[Record]
Reads records from a JDBC-compatible TableSource.

loadCollection

def loadCollection[T: ClassTag](collection: java.util.Collection[T]): DataQuanta[T]
def loadCollection[T: ClassTag](iterable: Iterable[T]): DataQuanta[T]
Wraps an in-memory collection or Iterable as a DataQuanta. Both Java and Scala collections are accepted.

Cloud storage sources

def readGoogleCloudStorageFile(bucket: String, blobName: String, credentialsPath: String): DataQuanta[String]
def readAmazonS3File(bucket: String, blobName: String, credentialsPath: String): DataQuanta[String]
def readAzureBlobStorageFile(container: String, blobName: String, credentialsPath: String): DataQuanta[String]

DataQuanta transformation methods

DataQuanta[Out] is the central abstraction. Every method below returns a new DataQuanta unless it is a sink.

map

def map[NewOut: ClassTag](udf: Out => NewOut,
                          udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
One-to-one transformation using a Scala lambda.
.map(word => (word.toLowerCase, 1))

mapJava

def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
                               udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
Accepts an explicit SerializableFunction — useful when working with extended functions that override open(ExecutionContext).

mapPartitions

def mapPartitions[NewOut: ClassTag](udf: Iterable[Out] => Iterable[NewOut],
                                    selectivity: ProbabilisticDoubleInterval = null,
                                    udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
Transforms an entire partition at a time. Reduces serialization overhead for complex iterative algorithms.

filter

def filter(udf: Out => Boolean,
           sqlUdf: String = null,
           selectivity: ProbabilisticDoubleInterval = null,
           udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Keeps elements where udf returns true.
.filter(_.nonEmpty, selectivity = 0.99)

flatMap

def flatMap[NewOut: ClassTag](udf: Out => Iterable[NewOut],
                               selectivity: ProbabilisticDoubleInterval = null,
                               udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
Expands each element into zero or more output elements.
.flatMap(_.split("\\W+"), selectivity = 10)

reduceByKey

def reduceByKey[Key: ClassTag](keyUdf: Out => Key,
                                udf: (Out, Out) => Out,
                                udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Groups by keyUdf and reduces within each group using the binary udf.
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2))

reduce

def reduce(udf: (Out, Out) => Out,
           udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Global reduce across the entire dataset.

groupByKey

def groupByKey[Key: ClassTag](keyUdf: Out => Key,
                               keyUdfLoad: LoadProfileEstimator = null): DataQuanta[java.lang.Iterable[Out]]
Groups elements by key and materializes each group as an Iterable.

group

def group(): DataQuanta[java.lang.Iterable[Out]]
Collects the entire dataset into a single Iterable group — equivalent to GROUP BY with no key.

count

def count: DataQuanta[java.lang.Long]
Returns the total number of elements as a single-element dataset.

sort

def sort[Key: ClassTag](keyUdf: Out => Key): DataQuanta[Out]
Sorts elements by the extracted key in ascending order.

distinct

def distinct: DataQuanta[Out]
Removes duplicate elements (uses structural equality).

union

def union(that: DataQuanta[Out]): DataQuanta[Out]
Produces all elements from both datasets (bag union — duplicates are kept).

intersect

def intersect(that: DataQuanta[Out]): DataQuanta[Out]
Produces only elements present in both datasets.

join

def join[ThatOut: ClassTag, Key: ClassTag](
  thisKeyUdf: Out => Key,
  that: DataQuanta[ThatOut],
  thatKeyUdf: ThatOut => Key): DataQuanta[WayangTuple2[Out, ThatOut]]
Equi-join on keys extracted by thisKeyUdf and thatKeyUdf. The output element type is WayangTuple2[Out, ThatOut] with .field0 and .field1 accessors.
val joined = users.join(_.id, orders, _.userId)
val result = joined.map(t => (t.field0.name, t.field1.amount))

coGroup

def coGroup[ThatOut: ClassTag, Key: ClassTag](
  thisKeyUdf: Out => Key,
  that: DataQuanta[ThatOut],
  thatKeyUdf: ThatOut => Key): DataQuanta[WayangTuple2[Iterable[Out], Iterable[ThatOut]]]
Like join but outputs pairs of grouped iterables instead of individual joined records.

cartesian

def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuanta[WayangTuple2[Out, ThatOut]]
Computes the Cartesian product.

sample

def sample(sampleSize: Int,
           datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
           seed: Option[Long] = None,
           sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out]
Draws a random sample of sampleSize elements.

zipWithId

def zipWithId: DataQuanta[WayangTuple2[java.lang.Long, Out]]
Attaches a unique monotonically increasing ID to each element.

project

def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuanta[NewOut]
Projects named fields from a Record using a ProjectionDescriptor. Requires the element type to be Record or a compatible bean.

doWhile

def doWhile[ConvOut: ClassTag](
  udf: Iterable[ConvOut] => Boolean,
  bodyBuilder: DataQuanta[Out] => (DataQuanta[Out], DataQuanta[ConvOut]),
  numExpectedIterations: Int = 20,
  udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Executes the loop body until udf applied to the convergence dataset returns true. The bodyBuilder function receives the current iteration’s data and returns a pair (nextIteration, convergenceData).
initialData.doWhile[Double](
  convergence => convergence.head < tolerance,
  current => {
    val next = current.map(updateStep)
    val conv = next.map(computeError).reduce(_ + _)
    (next, conv)
  }
)

repeat

def repeat(n: Int, bodyBuilder: DataQuanta[Out] => DataQuanta[Out]): DataQuanta[Out]
Executes the loop body exactly n times.
initialCentroids.repeat(100, current =>
  current.map(assignPoints)
         .reduceByKey(_.cluster, _.addPoints(_))
         .map(_.average)
)

withBroadcast

def withBroadcast(sender: DataQuanta[_], broadcastName: String): DataQuanta[Out]
Broadcasts sender to this operator’s UDF under the key broadcastName. Retrieve it inside ExtendedSerializableFunction.open() via executionCtx.getBroadcast[T](broadcastName).

Sink operators

collect

def collect(): Iterable[Out]
Triggers execution and returns all results to the driver as a Scala Iterable.

foreach

def foreach(f: Out => _): Unit
Triggers execution and applies a side-effecting function to each element.

writeTextFile

def writeTextFile(url: String,
                  formatterUdf: Out => String,
                  udfLoad: LoadProfileEstimator = null): Unit
Writes each element, formatted by formatterUdf, as a line in a text file.

writeObjectFile

def writeObjectFile(url: String)(implicit classTag: ClassTag[Out]): Unit
Serializes elements to an object file.

writeParquet

def writeParquet(url: String,
                 overwrite: Boolean = false,
                 preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit
Writes Record elements to a Parquet file. Requires the element type to be Record (enforced by the implicit evidence parameter).

Operator annotation methods

Append these to any DataQuanta in the chain before calling the next transformation or sink.
.withName("My operator")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
.withTargetPlatforms(Spark.platform)
.withUdfJarsOf(this.getClass)

RecordDataQuanta — schema-aware operations

When your DataQuanta holds Record elements you gain access to RecordDataQuanta via an implicit conversion defined in the org.apache.wayang.api package.
import org.apache.wayang.api._

val records: DataQuanta[Record] = planBuilder.readParquet("hdfs:///warehouse/sales.parquet")

// Project specific fields — returns DataQuanta[Record]
val projected = records.projectRecords(Seq("product_id", "revenue"))
def projectRecords(fieldNames: Seq[String],
                   udfCpuLoad: LoadEstimator = null,
                   udfRamLoad: LoadEstimator = null): DataQuanta[Record]
The implicit conversion is triggered automatically when you call projectRecords on a DataQuanta[Record].

MultiContextPlanBuilder — fan-out across WayangContexts

MultiContextPlanBuilder runs the same logical plan against multiple WayangContext instances simultaneously. This is useful for cross-cluster processing or A/B testing across different engine configurations.
import org.apache.wayang.api._
import org.apache.wayang.core.api.Configuration
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark

// MultiContext extends WayangContext — pass a Configuration, then register plugins.
val ctx1 = new MultiContext(new Configuration).withPlugin(Java.basicPlugin)
val ctx2 = new MultiContext(new Configuration).withPlugin(Spark.basicPlugin)

val multiBuilder = new MultiContextPlanBuilder(List(ctx1, ctx2))
  .withUdfJarsOf(this.getClass)

// Read a text file in each context independently.
val results = multiBuilder
  .readTextFile(ctx1, "file:///data/part-a.txt")
  .readTextFile(ctx2, "file:///data/part-b.txt")

// Apply transformations uniformly across all contexts.
// Each context gets its own sink via MultiContext.withTextFileSink before calling execute().
ctx1.withTextFileSink("file:///output/part-a.txt")
ctx2.withTextFileSink("file:///output/part-b.txt")

results
  .forEach(_.flatMap(_.split("\\W+")))
  .forEach(_.filter(_.nonEmpty))
  .execute()
MultiContextPlanBuilder key methods:
MethodDescription
readTextFile(ctx, url)Reads a text file in the specified context.
readObjectFile[T](ctx, url)Reads an object file in the specified context.
loadCollection[T](ctx, iterable)Loads an in-memory collection in the specified context.
forEach[Out](f: PlanBuilder => DataQuanta[Out])Applies f to each context’s PlanBuilder; returns a MultiContextDataQuanta.
withUdfJarsOf(classes*)Registers UDF JARs for all contexts.
MultiContextDataQuanta key methods:
MethodDescription
forEach[NewOut](f: DataQuanta[Out] => DataQuanta[NewOut])Applies a transformation f to each context’s DataQuanta.
combineEach[ThatOut, NewOut](that, f)Merges two MultiContextDataQuanta element-wise using f.
withTargetPlatforms(platforms*)Restricts all contexts’ operators to the given platforms.
execute(timeout)Executes all context plans in parallel, blocking until all finish.
mergeUnion(mergeContext)Executes all contexts, then unions the results into a single DataQuanta.

EdgeDataQuanta — graph processing

Import org.apache.wayang.api.graph._ to unlock graph-specific extensions. An Edge is a simple pair (source: Long, target: Long). A DataQuanta[Edge] is automatically enriched to an EdgeDataQuanta via implicit conversion.
import org.apache.wayang.api._
import org.apache.wayang.api.graph._

val edges: DataQuanta[Edge] = planBuilder
  .readTextFile("file:///graph.tsv")
  .map(line => { val parts = line.split("\t"); Edge(parts(0).toLong, parts(1).toLong) })

// Run PageRank for 20 iterations.
val pageRanks: DataQuanta[PageRank] = edges.pageRank(numIterations = 20)
pageRanks.collect().foreach(pr => println(s"Node ${pr.nodeId}: ${pr.rank}"))
def pageRank(
  numIterations: Int = 20,
  dampingFactor: Double = PageRankOperator.DEFAULT_DAMPING_FACTOR,
  graphDensity: ProbabilisticDoubleInterval = PageRankOperator.DEFAULT_GRAPH_DENSITIY
): DataQuanta[PageRank]
The PageRank output type exposes .nodeId: Long and .rank: Float.

Complete Scala WordCount example

This is the canonical Wayang WordCount pipeline taken directly from the project examples.
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark

object WordCountScala {
  def main(args: Array[String]): Unit = {

    val inputUrl = "file:///tmp/words.txt"

    // Register execution platforms — Wayang's optimizer will pick per step.
    val wayangContext = new WayangContext(new Configuration)
      .withPlugin(Java.basicPlugin)
      .withPlugin(Spark.basicPlugin)

    val planBuilder = new PlanBuilder(wayangContext)
      .withJobName(s"WordCount ($inputUrl)")
      .withUdfJarsOf(this.getClass)

    val wordcounts = planBuilder
      // 1. Read the text file.
      .readTextFile(inputUrl).withName("Load file")

      // 2. Split each line on non-word characters.
      .flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")

      // 3. Drop empty strings from leading/trailing punctuation.
      .filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")

      // 4. Pair each word with count 1.
      .map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")

      // 5. Sum counts per word.
      .reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Sum counts")
      .withCardinalityEstimator((in: Long) => math.round(in * 0.01))

      // 6. Collect results to the driver.
      .collect()

    println(wordcounts)
  }
}

Iterative k-means example

repeat enables iterative algorithms such as k-means. The pipeline reads a point cloud, initialises random centroids, and runs 100 iterations of the reassign-and-average loop.
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import scala.util.Random
import scala.collection.JavaConversions._

object KMeans {
  def main(args: Array[String]): Unit = {
    val inputUrl = "file:///kmeans.txt"
    val k = 5
    val iterations = 100

    val wayangContext = new WayangContext(new Configuration)
      .withPlugin(Java.basicPlugin)
      .withPlugin(Spark.basicPlugin)
    val planBuilder = new PlanBuilder(wayangContext)
      .withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")
      .withUdfJarsOf(this.getClass)

    case class Point(x: Double, y: Double)
    case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
      def addPoints(that: TaggedPointCounter) =
        TaggedPointCounter(x + that.x, y + that.y, cluster, count + that.count)
      def average = TaggedPointCounter(x / count, y / count, cluster, 0)
    }

    val points = planBuilder.readTextFile(inputUrl)
      .map { line => val f = line.split(","); Point(f(0).toDouble, f(1).toDouble) }

    val random = new Random
    val initialCentroids = planBuilder.loadCollection(
      for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)
    )

    class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {
      var centroids: Iterable[TaggedPointCounter] = _
      override def open(ctx: ExecutionContext): Unit =
        centroids = ctx.getBroadcast[TaggedPointCounter]("centroids")
      override def apply(p: Point): TaggedPointCounter = {
        var minDist = Double.PositiveInfinity
        var nearestId = -1
        for (c <- centroids) {
          val d = math.sqrt(math.pow(p.x - c.x, 2) + math.pow(p.y - c.y, 2))
          if (d < minDist) { minDist = d; nearestId = c.cluster }
        }
        TaggedPointCounter(p.x, p.y, nearestId, 1)
      }
    }

    val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
      points.mapJava(new SelectNearestCentroid)
            .withBroadcast(currentCentroids, "centroids")
            .withName("Find nearest centroid")
            .reduceByKey(_.cluster, _.addPoints(_))
            .withName("Add up points")
            .map(_.average)
            .withName("Average points")
    }).withName("k-means loop").collect()

    println(finalCentroids)
  }
}
The ExtendedSerializableFunction pattern (with an open() hook that retrieves broadcast variables) is the recommended way to access side data inside map UDFs on distributed engines such as Spark.

Build docs developers (and LLMs) love