Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

PyWayang is Apache Wayang’s Python frontend. You write your pipeline entirely in Python — WayangContext, DataQuanta, and standard Python callables — and the framework serialises the operator graph as JSON, submits it to the Wayang REST endpoint, and executes it on whichever JVM-based platforms you have registered. Python UDFs that cannot be expressed in a platform’s native language are executed through a subprocess-based Python worker, giving you the full Python ecosystem inside a distributed pipeline.

Requirements and installation

1

Install Python package

The PyWayang package lives inside the python/ directory of the Wayang repository. Install it in development mode or build a wheel:
cd python
pip install -e .
2

Start the Wayang REST server

PyWayang submits plans to the wayang-api-json REST endpoint. Build and start the server from the repository root:
mvn package -pl wayang-api/wayang-api-json -am -DskipTests
java -jar wayang-api/wayang-api-json/target/wayang-api-json-*-jar-with-dependencies.jar
By default the server listens on port 8080.
3

Configure wayang.properties (optional)

Create a wayang.properties file or configure the Configuration object programmatically:
wayang.properties
# Port of the Wayang REST server
wayang.api.python.port = 8080
Python 3.8+ is required. The subprocess-based Python worker must also be available on every executor node for UDFs that cannot be transpiled to the target platform’s native language.

WayangContext

WayangContext is the entry point for building Python pipelines. It holds the set of registered Plugin objects, each of which declares one or more execution platforms, and a Configuration that controls runtime properties.
from pywy.dataquanta import WayangContext
from pywy.core.core import Plugin
from pywy.core.platform import Platform

# Define a plugin for the Java platform
java_platform = Platform("Java")
java_plugin = Plugin(platforms={java_platform})

context = WayangContext()
context.register(java_plugin)

WayangContext constructor

class WayangContext:
    def __init__(self, configuration: Configuration = Configuration()):
        ...
configuration
Configuration
Optional Configuration instance. Use configuration.set_property(key, value) to set runtime properties such as wayang.api.python.port.

register / unregister

context.register(*plugins: Plugin) -> WayangContext
context.unregister(*plugins: Plugin) -> WayangContext
Adds or removes Plugin instances from the context. Plugins are applied when the plan is executed.

Source factory methods

Source methods on WayangContext begin a pipeline by wrapping an input operator in a DataQuanta.

textfile

context.textfile(file_path: str) -> DataQuanta[str]
Reads a text file line by line. Each line becomes one str element.
lines = context.textfile("file:///data/corpus.txt")

parquet

context.parquet(
    file_path: str,
    projection: Optional[List[str]] = None,
    column_names: Optional[List[str]] = None
) -> DataQuanta[Record]
Reads a Parquet file and returns DataQuanta[Record]. Optionally pass projection to select columns by index position, or column_names to rename fields.
records = context.parquet("hdfs:///warehouse/events.parquet",
                          projection=["user_id", "event_type"])

Configuration

Configuration is a simple key-value store passed to WayangContext.
from pywy.configuration import Configuration

config = Configuration()
config.set_property("wayang.api.python.port", "8080")

context = WayangContext(configuration=config)
Property keyDefaultDescription
wayang.api.python.port8080TCP port of the Wayang JSON REST server.

DataQuanta transformations

DataQuanta[T] is the Python equivalent of the Scala DataQuanta. It holds a reference to an operator node in the execution graph. Transformations are lazy — no data moves until a sink is called.

filter

def filter(self: DataQuanta[T],
           p: Predicate,
           input_type: GenericTco = None) -> DataQuanta[T]
Keeps elements for which the predicate p returns True.
words = lines.filter(lambda w: len(w) > 0)
p
Callable[[T], bool]
Predicate function; elements returning True are kept.
input_type
GenericTco (optional)
Python type hint for the input element. Inferred from the predicate signature if omitted.

map

def map(self: DataQuanta[In],
        f: Function,
        input_type: GenericTco = None,
        output_type: GenericTco = None) -> DataQuanta[Out]
Applies f to every element one-to-one.
pairs = words.map(lambda w: (w.lower(), 1))
f
Callable[[In], Out]
Transformation function applied to each element.
input_type
GenericTco (optional)
Input element type hint.
output_type
GenericTco (optional)
Output element type hint.

flatmap

def flatmap(self: DataQuanta[In],
            f: FlatmapFunction,
            input_type: GenericTco = None,
            output_type: GenericTco = None) -> DataQuanta[IterableOut]
Applies f to each element and flattens the resulting iterables into one stream.
words = lines.flatmap(lambda line: line.split())

reduce_by_key

def reduce_by_key(self: DataQuanta[In],
                  key_f: Function,
                  f: BiFunction,
                  input_type: GenericTco = None) -> DataQuanta[IterableOut]
Groups elements by the key extracted by key_f and reduces each group with the binary function f.
word_counts = pairs.reduce_by_key(
    lambda pair: pair[0],                          # key extractor
    lambda a, b: (a[0], a[1] + b[1])              # reducer
)
key_f
Callable[[In], Key]
Extracts the grouping key from each element.
f
Callable[[In, In], In]
Binary reduction function applied within each group.

sort

def sort(self: DataQuanta[In],
         key_f: Function,
         input_type: GenericTco = None) -> DataQuanta[IterableOut]
Sorts elements by the key extracted by key_f in ascending order.
sorted_counts = word_counts.sort(lambda pair: -pair[1])  # descending by count

join

def join(self: DataQuanta[In],
         this_key_f: Function,
         that: DataQuanta[In],
         that_key_f: Function,
         input_type: GenericTco = None) -> DataQuanta[Out]
Equi-join on keys extracted from both sides.
joined = users.join(
    lambda u: u["id"],
    orders,
    lambda o: o["user_id"]
)

cartesian

def cartesian(self: DataQuanta[In],
              that: DataQuanta[In],
              input_type: GenericTco = None) -> DataQuanta[Out]
Computes the Cartesian product of two datasets.

Sink operators

Sinks compile the operator graph, serialise it as JSON, post it to the Wayang REST server, and block until execution completes.

store_textfile

def store_textfile(self: DataQuanta[In],
                   path: str,
                   input_type: GenericTco = None) -> None
Writes each element as a line in a text file. This is the primary sink method in PyWayang.
word_counts.store_textfile("file:///output/wordcount.txt")
store_textfile is currently the only built-in sink in PyWayang. To collect results on the driver, use the Wayang Java or Scala API with .collect() instead.

Python UDF execution model

When a Python UDF (lambda or function) cannot be translated into the target platform’s native operation (e.g., a Spark RDD map), Wayang serialises the function using cloudpickle and runs it inside a subprocess Python worker co-located with the executor.
Python process                    JVM (Wayang executor)
─────────────────                 ──────────────────────────────────
WayangContext.textfile()
  → DataQuanta.flatmap()          Operator graph built in Python
  → DataQuanta.store_textfile()

PywyPlan.execute()
  → JSON serialisation            POST /wayang-api-json/submit-plan/json
                                  ┌──────────────────────────────────┐
                                  │  Wayang optimizer + planner      │
                                  │  Dispatches to Java Streams /    │
                                  │  Spark / ...                     │
                                  │                                  │
                                  │  Python UDFs: subprocess worker  │
                                  │  receives serialised function +  │
                                  │  data partition, returns results │
                                  └──────────────────────────────────┘

PywyPlan.execute internals

PywyPlan traverses the operator DAG from the sink back to the sources, groups consecutive UnaryToUnaryOperator nodes into a single MapPartitions batch (to minimise cross-process round-trips), serialises everything as JSON, and POSTs to the Wayang server.
from pywy.core.core import PywyPlan

# Constructed automatically when you call store_textfile().
plan = PywyPlan(context.plugins, context.configuration, sinks=[sink_operator])
plan.execute()

Complete Python WordCount example

from pywy.dataquanta import WayangContext
from pywy.core.core import Plugin
from pywy.core.platform import Platform
from pywy.configuration import Configuration

# 1. Configure the runtime.
config = Configuration()
config.set_property("wayang.api.python.port", "8080")

# 2. Register the Java platform plugin.
java_plugin = Plugin(platforms={Platform("Java")})

context = WayangContext(configuration=config)
context.register(java_plugin)

# 3. Build the word-count pipeline.
(
    context
    .textfile("file:///data/corpus.txt")             # source: one String per line
    .flatmap(lambda line: line.split())              # split into words
    .filter(lambda word: len(word) > 0)              # drop empty strings
    .map(lambda word: (word.lower(), 1))             # attach counter
    .reduce_by_key(
        lambda pair: pair[0],                        # key = the word
        lambda a, b: (a[0], a[1] + b[1])            # sum the counts
    )
    .store_textfile("file:///output/wordcount.txt")  # sink: triggers execution
)
Run the Wayang JSON server before executing this script. The pipeline posts to http://localhost:8080/wayang-api-json/submit-plan/json by default.

Machine learning operators

PyWayang exposes higher-level ML training operators directly on DataQuanta. These delegate to the corresponding JVM operator implementations.

train_logistic_regression

def train_logistic_regression(
    self: DataQuanta[In],
    labels: DataQuanta[In],
    fit_intercept: bool = True
) -> DataQuanta[Out]
Trains a logistic regression model. self holds feature vectors; labels holds binary labels (0.0 or 1.0).

train_decision_tree_regression

def train_decision_tree_regression(
    self: DataQuanta[In],
    labels: DataQuanta[In],
    max_depth: int = 5,
    min_instances: int = 2
) -> DataQuanta[Out]
Trains a decision tree regression model.

train_linear_svc

def train_linear_svc(
    self: DataQuanta[In],
    labels: DataQuanta[In],
    max_iter: int = 10,
    reg_param: float = 0.1
) -> DataQuanta[Out]
Trains a Linear Support Vector Classifier.

predict

def predict(
    self: DataQuanta[In],          # model DataQuanta
    that: DataQuanta[In],          # feature DataQuanta
    input_type: GenericTco,
    output_type: GenericTco
) -> DataQuanta[Out]
Runs inference: applies the model in self to the feature data in that.

Plugin and Platform model

A Plugin is a thin wrapper over a set of Platform instances. The Platform class holds a platform name that matches the JVM-side platform registration key.
from pywy.core.core import Plugin
from pywy.core.platform import Platform

java_plugin  = Plugin(platforms={Platform("Java")})
spark_plugin = Plugin(platforms={Platform("Spark")})

context.register(java_plugin, spark_plugin)
When PywyPlan.execute() serialises the plan it extracts the platform name from each plugin and includes it in the context.platforms JSON array so the Wayang server knows which engines to consider.

Operator reference table

DataQuanta methodInputOutputDescription
filter(p)TTKeep elements matching predicate.
map(f)InOutOne-to-one transformation.
flatmap(f)InIterableOutOne-to-many transformation, flattened.
reduce_by_key(key_f, f)InIterableOutGroup and reduce by key.
sort(key_f)InIterableOutSort by key.
join(key_f, that, that_key_f)InOutEqui-join two datasets.
cartesian(that)InOutCartesian product.
store_textfile(path)InWrite to text file (sink).

Build docs developers (and LLMs) love