Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt
Use this file to discover all available pages before exploring further.
Apache Wayang’s Scala API is the most expressive way to build cross-platform data pipelines. PlanBuilder accepts Scala lambdas, tuples, and case classes natively — no adapter boilerplate required. Every transformation returns a DataQuanta[T], a typed reference to an intermediate dataset in the logical plan. The plan is only materialized when you call a sink such as collect() or writeTextFile().
Maven / SBT dependency
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-api-scala-java_2.12</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-java</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-spark_2.12</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Creating WayangContext and PlanBuilder
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName("MyScalaJob")
.withUdfJarsOf(this.getClass)
PlanBuilder constructor:
class PlanBuilder(wayangContext: WayangContext, jobName: String = null)
Builder configuration methods
| Method | Returns | Description |
|---|
withJobName(name: String) | PlanBuilder | Labels the job in cost model and logs. |
withUdfJars(paths: String*) | PlanBuilder | Registers JAR files for remote engines. |
withUdfJarsOf(classes: Class[_]*) | PlanBuilder | Resolves and registers JARs from class locations. |
withExperiment(exp: Experiment) | PlanBuilder | Attaches a ProfileDB experiment for metrics collection. |
Source operators
readTextFile
def readTextFile(url: String): DataQuanta[String]
Reads a text file line by line. URL must be fully qualified (file://, hdfs://, etc.).
val lines = planBuilder.readTextFile("file:///data/corpus.txt")
readObjectFile
def readObjectFile[T: ClassTag](url: String): DataQuanta[T]
Reads a serialized object file where every element is of type T.
readParquet
def readParquet(url: String,
projection: Array[String] = null,
preferDataset: Boolean = false): DataQuanta[Record]
Reads a Parquet file and returns DataQuanta[Record]. Pass a projection array to select only specific columns. Set preferDataset = true to keep the result in a Spark Dataset for downstream dataset-aware operations.
readApacheIcebergTable
def readApacheIcebergTable(
catalog: Catalog,
tableIdentifier: TableIdentifier,
filterExpressions: Array[Expression] = null,
projectionColumns: Array[String] = null): DataQuanta[Record]
Reads an Apache Iceberg table through an Iceberg Catalog instance.
readTable
def readTable(source: TableSource): DataQuanta[Record]
Reads records from a JDBC-compatible TableSource.
loadCollection
def loadCollection[T: ClassTag](collection: java.util.Collection[T]): DataQuanta[T]
def loadCollection[T: ClassTag](iterable: Iterable[T]): DataQuanta[T]
Wraps an in-memory collection or Iterable as a DataQuanta. Both Java and Scala collections are accepted.
Cloud storage sources
def readGoogleCloudStorageFile(bucket: String, blobName: String, credentialsPath: String): DataQuanta[String]
def readAmazonS3File(bucket: String, blobName: String, credentialsPath: String): DataQuanta[String]
def readAzureBlobStorageFile(container: String, blobName: String, credentialsPath: String): DataQuanta[String]
DataQuanta[Out] is the central abstraction. Every method below returns a new DataQuanta unless it is a sink.
map
def map[NewOut: ClassTag](udf: Out => NewOut,
udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
One-to-one transformation using a Scala lambda.
.map(word => (word.toLowerCase, 1))
mapJava
def mapJava[NewOut: ClassTag](udf: SerializableFunction[Out, NewOut],
udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
Accepts an explicit SerializableFunction — useful when working with extended functions that override open(ExecutionContext).
mapPartitions
def mapPartitions[NewOut: ClassTag](udf: Iterable[Out] => Iterable[NewOut],
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
Transforms an entire partition at a time. Reduces serialization overhead for complex iterative algorithms.
filter
def filter(udf: Out => Boolean,
sqlUdf: String = null,
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Keeps elements where udf returns true.
.filter(_.nonEmpty, selectivity = 0.99)
flatMap
def flatMap[NewOut: ClassTag](udf: Out => Iterable[NewOut],
selectivity: ProbabilisticDoubleInterval = null,
udfLoad: LoadProfileEstimator = null): DataQuanta[NewOut]
Expands each element into zero or more output elements.
.flatMap(_.split("\\W+"), selectivity = 10)
reduceByKey
def reduceByKey[Key: ClassTag](keyUdf: Out => Key,
udf: (Out, Out) => Out,
udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Groups by keyUdf and reduces within each group using the binary udf.
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2))
reduce
def reduce(udf: (Out, Out) => Out,
udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Global reduce across the entire dataset.
groupByKey
def groupByKey[Key: ClassTag](keyUdf: Out => Key,
keyUdfLoad: LoadProfileEstimator = null): DataQuanta[java.lang.Iterable[Out]]
Groups elements by key and materializes each group as an Iterable.
group
def group(): DataQuanta[java.lang.Iterable[Out]]
Collects the entire dataset into a single Iterable group — equivalent to GROUP BY with no key.
count
def count: DataQuanta[java.lang.Long]
Returns the total number of elements as a single-element dataset.
sort
def sort[Key: ClassTag](keyUdf: Out => Key): DataQuanta[Out]
Sorts elements by the extracted key in ascending order.
distinct
def distinct: DataQuanta[Out]
Removes duplicate elements (uses structural equality).
union
def union(that: DataQuanta[Out]): DataQuanta[Out]
Produces all elements from both datasets (bag union — duplicates are kept).
intersect
def intersect(that: DataQuanta[Out]): DataQuanta[Out]
Produces only elements present in both datasets.
join
def join[ThatOut: ClassTag, Key: ClassTag](
thisKeyUdf: Out => Key,
that: DataQuanta[ThatOut],
thatKeyUdf: ThatOut => Key): DataQuanta[WayangTuple2[Out, ThatOut]]
Equi-join on keys extracted by thisKeyUdf and thatKeyUdf. The output element type is WayangTuple2[Out, ThatOut] with .field0 and .field1 accessors.
val joined = users.join(_.id, orders, _.userId)
val result = joined.map(t => (t.field0.name, t.field1.amount))
coGroup
def coGroup[ThatOut: ClassTag, Key: ClassTag](
thisKeyUdf: Out => Key,
that: DataQuanta[ThatOut],
thatKeyUdf: ThatOut => Key): DataQuanta[WayangTuple2[Iterable[Out], Iterable[ThatOut]]]
Like join but outputs pairs of grouped iterables instead of individual joined records.
cartesian
def cartesian[ThatOut: ClassTag](that: DataQuanta[ThatOut]): DataQuanta[WayangTuple2[Out, ThatOut]]
Computes the Cartesian product.
sample
def sample(sampleSize: Int,
datasetSize: Long = SampleOperator.UNKNOWN_DATASET_SIZE,
seed: Option[Long] = None,
sampleMethod: SampleOperator.Methods = SampleOperator.Methods.ANY): DataQuanta[Out]
Draws a random sample of sampleSize elements.
zipWithId
def zipWithId: DataQuanta[WayangTuple2[java.lang.Long, Out]]
Attaches a unique monotonically increasing ID to each element.
project
def project[NewOut: ClassTag](fieldNames: Seq[String]): DataQuanta[NewOut]
Projects named fields from a Record using a ProjectionDescriptor. Requires the element type to be Record or a compatible bean.
doWhile
def doWhile[ConvOut: ClassTag](
udf: Iterable[ConvOut] => Boolean,
bodyBuilder: DataQuanta[Out] => (DataQuanta[Out], DataQuanta[ConvOut]),
numExpectedIterations: Int = 20,
udfLoad: LoadProfileEstimator = null): DataQuanta[Out]
Executes the loop body until udf applied to the convergence dataset returns true. The bodyBuilder function receives the current iteration’s data and returns a pair (nextIteration, convergenceData).
initialData.doWhile[Double](
convergence => convergence.head < tolerance,
current => {
val next = current.map(updateStep)
val conv = next.map(computeError).reduce(_ + _)
(next, conv)
}
)
repeat
def repeat(n: Int, bodyBuilder: DataQuanta[Out] => DataQuanta[Out]): DataQuanta[Out]
Executes the loop body exactly n times.
initialCentroids.repeat(100, current =>
current.map(assignPoints)
.reduceByKey(_.cluster, _.addPoints(_))
.map(_.average)
)
withBroadcast
def withBroadcast(sender: DataQuanta[_], broadcastName: String): DataQuanta[Out]
Broadcasts sender to this operator’s UDF under the key broadcastName. Retrieve it inside ExtendedSerializableFunction.open() via executionCtx.getBroadcast[T](broadcastName).
Sink operators
collect
def collect(): Iterable[Out]
Triggers execution and returns all results to the driver as a Scala Iterable.
foreach
def foreach(f: Out => _): Unit
Triggers execution and applies a side-effecting function to each element.
writeTextFile
def writeTextFile(url: String,
formatterUdf: Out => String,
udfLoad: LoadProfileEstimator = null): Unit
Writes each element, formatted by formatterUdf, as a line in a text file.
writeObjectFile
def writeObjectFile(url: String)(implicit classTag: ClassTag[Out]): Unit
Serializes elements to an object file.
writeParquet
def writeParquet(url: String,
overwrite: Boolean = false,
preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit
Writes Record elements to a Parquet file. Requires the element type to be Record (enforced by the implicit evidence parameter).
Operator annotation methods
Append these to any DataQuanta in the chain before calling the next transformation or sink.
.withName("My operator")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
.withTargetPlatforms(Spark.platform)
.withUdfJarsOf(this.getClass)
RecordDataQuanta — schema-aware operations
When your DataQuanta holds Record elements you gain access to RecordDataQuanta via an implicit conversion defined in the org.apache.wayang.api package.
import org.apache.wayang.api._
val records: DataQuanta[Record] = planBuilder.readParquet("hdfs:///warehouse/sales.parquet")
// Project specific fields — returns DataQuanta[Record]
val projected = records.projectRecords(Seq("product_id", "revenue"))
def projectRecords(fieldNames: Seq[String],
udfCpuLoad: LoadEstimator = null,
udfRamLoad: LoadEstimator = null): DataQuanta[Record]
The implicit conversion is triggered automatically when you call projectRecords on a DataQuanta[Record].
MultiContextPlanBuilder — fan-out across WayangContexts
MultiContextPlanBuilder runs the same logical plan against multiple WayangContext instances simultaneously. This is useful for cross-cluster processing or A/B testing across different engine configurations.
import org.apache.wayang.api._
import org.apache.wayang.core.api.Configuration
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
// MultiContext extends WayangContext — pass a Configuration, then register plugins.
val ctx1 = new MultiContext(new Configuration).withPlugin(Java.basicPlugin)
val ctx2 = new MultiContext(new Configuration).withPlugin(Spark.basicPlugin)
val multiBuilder = new MultiContextPlanBuilder(List(ctx1, ctx2))
.withUdfJarsOf(this.getClass)
// Read a text file in each context independently.
val results = multiBuilder
.readTextFile(ctx1, "file:///data/part-a.txt")
.readTextFile(ctx2, "file:///data/part-b.txt")
// Apply transformations uniformly across all contexts.
// Each context gets its own sink via MultiContext.withTextFileSink before calling execute().
ctx1.withTextFileSink("file:///output/part-a.txt")
ctx2.withTextFileSink("file:///output/part-b.txt")
results
.forEach(_.flatMap(_.split("\\W+")))
.forEach(_.filter(_.nonEmpty))
.execute()
MultiContextPlanBuilder key methods:
| Method | Description |
|---|
readTextFile(ctx, url) | Reads a text file in the specified context. |
readObjectFile[T](ctx, url) | Reads an object file in the specified context. |
loadCollection[T](ctx, iterable) | Loads an in-memory collection in the specified context. |
forEach[Out](f: PlanBuilder => DataQuanta[Out]) | Applies f to each context’s PlanBuilder; returns a MultiContextDataQuanta. |
withUdfJarsOf(classes*) | Registers UDF JARs for all contexts. |
MultiContextDataQuanta key methods:
| Method | Description |
|---|
forEach[NewOut](f: DataQuanta[Out] => DataQuanta[NewOut]) | Applies a transformation f to each context’s DataQuanta. |
combineEach[ThatOut, NewOut](that, f) | Merges two MultiContextDataQuanta element-wise using f. |
withTargetPlatforms(platforms*) | Restricts all contexts’ operators to the given platforms. |
execute(timeout) | Executes all context plans in parallel, blocking until all finish. |
mergeUnion(mergeContext) | Executes all contexts, then unions the results into a single DataQuanta. |
EdgeDataQuanta — graph processing
Import org.apache.wayang.api.graph._ to unlock graph-specific extensions. An Edge is a simple pair (source: Long, target: Long). A DataQuanta[Edge] is automatically enriched to an EdgeDataQuanta via implicit conversion.
import org.apache.wayang.api._
import org.apache.wayang.api.graph._
val edges: DataQuanta[Edge] = planBuilder
.readTextFile("file:///graph.tsv")
.map(line => { val parts = line.split("\t"); Edge(parts(0).toLong, parts(1).toLong) })
// Run PageRank for 20 iterations.
val pageRanks: DataQuanta[PageRank] = edges.pageRank(numIterations = 20)
pageRanks.collect().foreach(pr => println(s"Node ${pr.nodeId}: ${pr.rank}"))
def pageRank(
numIterations: Int = 20,
dampingFactor: Double = PageRankOperator.DEFAULT_DAMPING_FACTOR,
graphDensity: ProbabilisticDoubleInterval = PageRankOperator.DEFAULT_GRAPH_DENSITIY
): DataQuanta[PageRank]
The PageRank output type exposes .nodeId: Long and .rank: Float.
Complete Scala WordCount example
This is the canonical Wayang WordCount pipeline taken directly from the project examples.
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
object WordCountScala {
def main(args: Array[String]): Unit = {
val inputUrl = "file:///tmp/words.txt"
// Register execution platforms — Wayang's optimizer will pick per step.
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"WordCount ($inputUrl)")
.withUdfJarsOf(this.getClass)
val wordcounts = planBuilder
// 1. Read the text file.
.readTextFile(inputUrl).withName("Load file")
// 2. Split each line on non-word characters.
.flatMap(_.split("\\W+"), selectivity = 10).withName("Split words")
// 3. Drop empty strings from leading/trailing punctuation.
.filter(_.nonEmpty, selectivity = 0.99).withName("Filter empty words")
// 4. Pair each word with count 1.
.map(word => (word.toLowerCase, 1)).withName("To lower case, add counter")
// 5. Sum counts per word.
.reduceByKey(_._1, (c1, c2) => (c1._1, c1._2 + c2._2)).withName("Sum counts")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
// 6. Collect results to the driver.
.collect()
println(wordcounts)
}
}
Iterative k-means example
repeat enables iterative algorithms such as k-means. The pipeline reads a point cloud, initialises random centroids, and runs 100 iterations of the reassign-and-average loop.
import org.apache.wayang.api._
import org.apache.wayang.core.api.{Configuration, WayangContext}
import org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction
import org.apache.wayang.core.function.ExecutionContext
import org.apache.wayang.java.Java
import org.apache.wayang.spark.Spark
import scala.util.Random
import scala.collection.JavaConversions._
object KMeans {
def main(args: Array[String]): Unit = {
val inputUrl = "file:///kmeans.txt"
val k = 5
val iterations = 100
val wayangContext = new WayangContext(new Configuration)
.withPlugin(Java.basicPlugin)
.withPlugin(Spark.basicPlugin)
val planBuilder = new PlanBuilder(wayangContext)
.withJobName(s"k-means ($inputUrl, k=$k, $iterations iterations)")
.withUdfJarsOf(this.getClass)
case class Point(x: Double, y: Double)
case class TaggedPointCounter(x: Double, y: Double, cluster: Int, count: Long) {
def addPoints(that: TaggedPointCounter) =
TaggedPointCounter(x + that.x, y + that.y, cluster, count + that.count)
def average = TaggedPointCounter(x / count, y / count, cluster, 0)
}
val points = planBuilder.readTextFile(inputUrl)
.map { line => val f = line.split(","); Point(f(0).toDouble, f(1).toDouble) }
val random = new Random
val initialCentroids = planBuilder.loadCollection(
for (i <- 1 to k) yield TaggedPointCounter(random.nextGaussian(), random.nextGaussian(), i, 0)
)
class SelectNearestCentroid extends ExtendedSerializableFunction[Point, TaggedPointCounter] {
var centroids: Iterable[TaggedPointCounter] = _
override def open(ctx: ExecutionContext): Unit =
centroids = ctx.getBroadcast[TaggedPointCounter]("centroids")
override def apply(p: Point): TaggedPointCounter = {
var minDist = Double.PositiveInfinity
var nearestId = -1
for (c <- centroids) {
val d = math.sqrt(math.pow(p.x - c.x, 2) + math.pow(p.y - c.y, 2))
if (d < minDist) { minDist = d; nearestId = c.cluster }
}
TaggedPointCounter(p.x, p.y, nearestId, 1)
}
}
val finalCentroids = initialCentroids.repeat(iterations, { currentCentroids =>
points.mapJava(new SelectNearestCentroid)
.withBroadcast(currentCentroids, "centroids")
.withName("Find nearest centroid")
.reduceByKey(_.cluster, _.addPoints(_))
.withName("Add up points")
.map(_.average)
.withName("Average points")
}).withName("k-means loop").collect()
println(finalCentroids)
}
}
The ExtendedSerializableFunction pattern (with an open() hook that retrieves broadcast variables) is the recommended way to access side data inside map UDFs on distributed engines such as Spark.