Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt
Use this file to discover all available pages before exploring further.
PyWayang is Apache Wayang’s Python frontend. You write your pipeline entirely in Python — WayangContext, DataQuanta, and standard Python callables — and the framework serialises the operator graph as JSON, submits it to the Wayang REST endpoint, and executes it on whichever JVM-based platforms you have registered. Python UDFs that cannot be expressed in a platform’s native language are executed through a subprocess-based Python worker, giving you the full Python ecosystem inside a distributed pipeline.
Requirements and installation
Install Python package
The PyWayang package lives inside the python/ directory of the Wayang repository. Install it in development mode or build a wheel:cd python
pip install -e .
Start the Wayang REST server
PyWayang submits plans to the wayang-api-json REST endpoint. Build and start the server from the repository root:mvn package -pl wayang-api/wayang-api-json -am -DskipTests
java -jar wayang-api/wayang-api-json/target/wayang-api-json-*-jar-with-dependencies.jar
By default the server listens on port 8080. Configure wayang.properties (optional)
Create a wayang.properties file or configure the Configuration object programmatically:# Port of the Wayang REST server
wayang.api.python.port = 8080
Python 3.8+ is required. The subprocess-based Python worker must also be available on every executor node for UDFs that cannot be transpiled to the target platform’s native language.
WayangContext
WayangContext is the entry point for building Python pipelines. It holds the set of registered Plugin objects, each of which declares one or more execution platforms, and a Configuration that controls runtime properties.
from pywy.dataquanta import WayangContext
from pywy.core.core import Plugin
from pywy.core.platform import Platform
# Define a plugin for the Java platform
java_platform = Platform("Java")
java_plugin = Plugin(platforms={java_platform})
context = WayangContext()
context.register(java_plugin)
WayangContext constructor
class WayangContext:
def __init__(self, configuration: Configuration = Configuration()):
...
Optional Configuration instance. Use configuration.set_property(key, value) to set runtime properties such as wayang.api.python.port.
register / unregister
context.register(*plugins: Plugin) -> WayangContext
context.unregister(*plugins: Plugin) -> WayangContext
Adds or removes Plugin instances from the context. Plugins are applied when the plan is executed.
Source factory methods
Source methods on WayangContext begin a pipeline by wrapping an input operator in a DataQuanta.
textfile
context.textfile(file_path: str) -> DataQuanta[str]
Reads a text file line by line. Each line becomes one str element.
lines = context.textfile("file:///data/corpus.txt")
parquet
context.parquet(
file_path: str,
projection: Optional[List[str]] = None,
column_names: Optional[List[str]] = None
) -> DataQuanta[Record]
Reads a Parquet file and returns DataQuanta[Record]. Optionally pass projection to select columns by index position, or column_names to rename fields.
records = context.parquet("hdfs:///warehouse/events.parquet",
projection=["user_id", "event_type"])
Configuration
Configuration is a simple key-value store passed to WayangContext.
from pywy.configuration import Configuration
config = Configuration()
config.set_property("wayang.api.python.port", "8080")
context = WayangContext(configuration=config)
| Property key | Default | Description |
|---|
wayang.api.python.port | 8080 | TCP port of the Wayang JSON REST server. |
DataQuanta[T] is the Python equivalent of the Scala DataQuanta. It holds a reference to an operator node in the execution graph. Transformations are lazy — no data moves until a sink is called.
filter
def filter(self: DataQuanta[T],
p: Predicate,
input_type: GenericTco = None) -> DataQuanta[T]
Keeps elements for which the predicate p returns True.
words = lines.filter(lambda w: len(w) > 0)
Predicate function; elements returning True are kept.
Python type hint for the input element. Inferred from the predicate signature if omitted.
map
def map(self: DataQuanta[In],
f: Function,
input_type: GenericTco = None,
output_type: GenericTco = None) -> DataQuanta[Out]
Applies f to every element one-to-one.
pairs = words.map(lambda w: (w.lower(), 1))
Transformation function applied to each element.
Output element type hint.
flatmap
def flatmap(self: DataQuanta[In],
f: FlatmapFunction,
input_type: GenericTco = None,
output_type: GenericTco = None) -> DataQuanta[IterableOut]
Applies f to each element and flattens the resulting iterables into one stream.
words = lines.flatmap(lambda line: line.split())
reduce_by_key
def reduce_by_key(self: DataQuanta[In],
key_f: Function,
f: BiFunction,
input_type: GenericTco = None) -> DataQuanta[IterableOut]
Groups elements by the key extracted by key_f and reduces each group with the binary function f.
word_counts = pairs.reduce_by_key(
lambda pair: pair[0], # key extractor
lambda a, b: (a[0], a[1] + b[1]) # reducer
)
Extracts the grouping key from each element.
Binary reduction function applied within each group.
sort
def sort(self: DataQuanta[In],
key_f: Function,
input_type: GenericTco = None) -> DataQuanta[IterableOut]
Sorts elements by the key extracted by key_f in ascending order.
sorted_counts = word_counts.sort(lambda pair: -pair[1]) # descending by count
join
def join(self: DataQuanta[In],
this_key_f: Function,
that: DataQuanta[In],
that_key_f: Function,
input_type: GenericTco = None) -> DataQuanta[Out]
Equi-join on keys extracted from both sides.
joined = users.join(
lambda u: u["id"],
orders,
lambda o: o["user_id"]
)
cartesian
def cartesian(self: DataQuanta[In],
that: DataQuanta[In],
input_type: GenericTco = None) -> DataQuanta[Out]
Computes the Cartesian product of two datasets.
Sink operators
Sinks compile the operator graph, serialise it as JSON, post it to the Wayang REST server, and block until execution completes.
store_textfile
def store_textfile(self: DataQuanta[In],
path: str,
input_type: GenericTco = None) -> None
Writes each element as a line in a text file. This is the primary sink method in PyWayang.
word_counts.store_textfile("file:///output/wordcount.txt")
store_textfile is currently the only built-in sink in PyWayang. To collect results on the driver, use the Wayang Java or Scala API with .collect() instead.
Python UDF execution model
When a Python UDF (lambda or function) cannot be translated into the target platform’s native operation (e.g., a Spark RDD map), Wayang serialises the function using cloudpickle and runs it inside a subprocess Python worker co-located with the executor.
Python process JVM (Wayang executor)
───────────────── ──────────────────────────────────
WayangContext.textfile()
→ DataQuanta.flatmap() Operator graph built in Python
→ DataQuanta.store_textfile()
PywyPlan.execute()
→ JSON serialisation POST /wayang-api-json/submit-plan/json
┌──────────────────────────────────┐
│ Wayang optimizer + planner │
│ Dispatches to Java Streams / │
│ Spark / ... │
│ │
│ Python UDFs: subprocess worker │
│ receives serialised function + │
│ data partition, returns results │
└──────────────────────────────────┘
PywyPlan.execute internals
PywyPlan traverses the operator DAG from the sink back to the sources, groups consecutive UnaryToUnaryOperator nodes into a single MapPartitions batch (to minimise cross-process round-trips), serialises everything as JSON, and POSTs to the Wayang server.
from pywy.core.core import PywyPlan
# Constructed automatically when you call store_textfile().
plan = PywyPlan(context.plugins, context.configuration, sinks=[sink_operator])
plan.execute()
Complete Python WordCount example
from pywy.dataquanta import WayangContext
from pywy.core.core import Plugin
from pywy.core.platform import Platform
from pywy.configuration import Configuration
# 1. Configure the runtime.
config = Configuration()
config.set_property("wayang.api.python.port", "8080")
# 2. Register the Java platform plugin.
java_plugin = Plugin(platforms={Platform("Java")})
context = WayangContext(configuration=config)
context.register(java_plugin)
# 3. Build the word-count pipeline.
(
context
.textfile("file:///data/corpus.txt") # source: one String per line
.flatmap(lambda line: line.split()) # split into words
.filter(lambda word: len(word) > 0) # drop empty strings
.map(lambda word: (word.lower(), 1)) # attach counter
.reduce_by_key(
lambda pair: pair[0], # key = the word
lambda a, b: (a[0], a[1] + b[1]) # sum the counts
)
.store_textfile("file:///output/wordcount.txt") # sink: triggers execution
)
Run the Wayang JSON server before executing this script. The pipeline posts to http://localhost:8080/wayang-api-json/submit-plan/json by default.
Machine learning operators
PyWayang exposes higher-level ML training operators directly on DataQuanta. These delegate to the corresponding JVM operator implementations.
train_logistic_regression
def train_logistic_regression(
self: DataQuanta[In],
labels: DataQuanta[In],
fit_intercept: bool = True
) -> DataQuanta[Out]
Trains a logistic regression model. self holds feature vectors; labels holds binary labels (0.0 or 1.0).
train_decision_tree_regression
def train_decision_tree_regression(
self: DataQuanta[In],
labels: DataQuanta[In],
max_depth: int = 5,
min_instances: int = 2
) -> DataQuanta[Out]
Trains a decision tree regression model.
train_linear_svc
def train_linear_svc(
self: DataQuanta[In],
labels: DataQuanta[In],
max_iter: int = 10,
reg_param: float = 0.1
) -> DataQuanta[Out]
Trains a Linear Support Vector Classifier.
predict
def predict(
self: DataQuanta[In], # model DataQuanta
that: DataQuanta[In], # feature DataQuanta
input_type: GenericTco,
output_type: GenericTco
) -> DataQuanta[Out]
Runs inference: applies the model in self to the feature data in that.
A Plugin is a thin wrapper over a set of Platform instances. The Platform class holds a platform name that matches the JVM-side platform registration key.
from pywy.core.core import Plugin
from pywy.core.platform import Platform
java_plugin = Plugin(platforms={Platform("Java")})
spark_plugin = Plugin(platforms={Platform("Spark")})
context.register(java_plugin, spark_plugin)
When PywyPlan.execute() serialises the plan it extracts the platform name from each plugin and includes it in the context.platforms JSON array so the Wayang server knows which engines to consider.
Operator reference table
DataQuanta method | Input | Output | Description |
|---|
filter(p) | T | T | Keep elements matching predicate. |
map(f) | In | Out | One-to-one transformation. |
flatmap(f) | In | IterableOut | One-to-many transformation, flattened. |
reduce_by_key(key_f, f) | In | IterableOut | Group and reduce by key. |
sort(key_f) | In | IterableOut | Sort by key. |
join(key_f, that, that_key_f) | In | Out | Equi-join two datasets. |
cartesian(that) | In | Out | Cartesian product. |
store_textfile(path) | In | — | Write to text file (sink). |