Documentation Index
Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt
Use this file to discover all available pages before exploring further.
Beyond the main compute engines, Wayang supports a range of database, messaging, and specialist platforms. These platforms let the optimizer push work to where the data already lives — running a filter or join inside a database avoids moving data across the network — or tap into specialised engines for graph processing and machine learning. Each platform follows the same plugin registration pattern as Java Streams, Spark, and Flink: add the Maven dependency, call the appropriate .plugin() factory, and register it on your WayangContext.
PostgreSQL
The PostgreSQL platform adapter uses JDBC to push Wayang logical operators into PostgreSQL as SQL queries. When the optimizer routes an operator to PostgreSQL, Wayang generates and executes SQL instead of extracting rows to process externally. This is particularly efficient for filter-heavy pipelines where early selectivity reduces the data that leaves the database.
Maven dependency
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-postgres</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Entry-point class: Postgres
import org.apache.wayang.postgres.Postgres;
// Core SQL-pushdown operators (filter, join, projection, global reduce, table sink)
Postgres.plugin()
// Channel-conversion rules (SQL query channel ↔ Java Collection, etc.)
Postgres.conversionPlugin()
// Direct platform reference (rarely needed)
Postgres.platform()
Supported operators
The PostgreSQL plugin maps these logical operators to SQL execution:
| Logical operator | SQL equivalent |
|---|
FilterOperator | WHERE clause |
ProjectionOperator | SELECT column list |
JoinOperator | INNER JOIN |
GlobalReduceOperator | aggregate functions (SUM, COUNT, etc.) |
TableSource | SELECT * FROM <table> |
TableSink | INSERT INTO <table> |
Connection configuration
Configure the JDBC connection in your Configuration object. The property keys follow the pattern wayang.postgres.<key>:
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.postgres.Postgres;
Configuration conf = new Configuration();
conf.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/mydb");
conf.setProperty("wayang.postgres.jdbc.user", "wayang_user");
conf.setProperty("wayang.postgres.jdbc.password", "s3cr3t");
// Optional: tune cost-model estimates for your hardware
conf.setProperty("wayang.postgres.cpu.mhz", "3000");
conf.setProperty("wayang.postgres.cores", "4");
WayangContext wayang = new WayangContext(conf)
.withPlugin(Java.basicPlugin()) // fallback for operators not supported in SQL
.withPlugin(Postgres.plugin())
.withPlugin(Postgres.conversionPlugin());
The full set of default cost-model properties (from wayang-postgres-defaults.properties):
# wayang.postgres.jdbc.url = jdbc:postgresql://host:5432/dbname
# wayang.postgres.jdbc.user = <username>
# wayang.postgres.jdbc.password = <password>
wayang.postgres.cpu.mhz = 2700
wayang.postgres.cores = 2
wayang.postgres.costs.fix = 0.0
wayang.postgres.costs.per-ms = 1.0
Reading from a table: TableSource
Use TableSource to read rows from a PostgreSQL table as Wayang data quanta. Pair it with ProjectionOperator to push column selection into the SQL query:
import org.apache.wayang.basic.operators.TableSource;
// Read all columns from the 'orders' table
TableSource ordersSource = new TableSource("orders");
// Read only specific columns — Wayang pushes these as SELECT col1, col2 FROM orders
TableSource narrowSource = new TableSource("orders", "order_id", "customer_id", "amount");
In the fluent builder API:
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.java.Java;
import org.apache.wayang.postgres.Postgres;
import java.util.Collection;
public class PostgresTableRead {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/sales");
conf.setProperty("wayang.postgres.jdbc.user", "analyst");
conf.setProperty("wayang.postgres.jdbc.password", "password");
WayangContext wayang = new WayangContext(conf)
.withPlugin(Java.basicPlugin())
.withPlugin(Postgres.plugin())
.withPlugin(Postgres.conversionPlugin());
Collection<Record> highValueOrders =
new JavaPlanBuilder(wayang)
.withJobName("HighValueOrders")
.withUdfJarOf(PostgresTableRead.class)
// Source: read from the 'orders' table
.readTable(new org.apache.wayang.basic.operators.TableSource("orders", "order_id", "amount"))
// Filter pushed to PostgreSQL as: WHERE amount > 1000
.filter(record -> (double) record.getField(1) > 1000.0)
.collect();
highValueOrders.forEach(System.out::println);
}
}
Wayang’s optimizer will push the filter into PostgreSQL as a WHERE clause when the Postgres.plugin() is registered, avoiding transfer of all rows. If only Java.basicPlugin() is registered, the filter runs locally after all rows are fetched.
SQLite3
The SQLite3 platform adapter is structurally identical to PostgreSQL but targets the SQLite embedded database engine. It is useful for edge deployments, offline scenarios, and integration tests that need an in-process SQL engine without a running server.
Maven dependency
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-sqlite3</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Entry-point class: Sqlite3
import org.apache.wayang.sqlite3.Sqlite3;
// SQL-pushdown operators (filter, projection, table sink)
Sqlite3.plugin()
// Channel-conversion rules
Sqlite3.conversionPlugin()
// Direct platform reference
Sqlite3.platform()
Connection configuration
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.sqlite3.Sqlite3;
Configuration conf = new Configuration();
// Point to a file-based SQLite database
conf.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite:/path/to/mydb.sqlite");
// For an in-memory database (test/CI use):
// conf.setProperty("wayang.sqlite3.jdbc.url", "jdbc:sqlite::memory:");
WayangContext wayang = new WayangContext(conf)
.withPlugin(Java.basicPlugin())
.withPlugin(Sqlite3.plugin())
.withPlugin(Sqlite3.conversionPlugin());
Default cost-model properties (from wayang-sqlite3-defaults.properties):
# wayang.sqlite3.jdbc.url = jdbc:sqlite:/path/to/file.db
wayang.sqlite3.cpu.mhz = 2700
wayang.sqlite3.cores = 2
wayang.sqlite3.costs.fix = 0.0
wayang.sqlite3.costs.per-ms = 1.0
SQLite3’s operator coverage is a subset of PostgreSQL’s. If an operator cannot be mapped to SQLite3 SQL, Wayang falls back to Java Streams automatically — as long as Java.basicPlugin() is also registered.
Apache Kafka
Wayang treats Apache Kafka as a first-class data source and sink through two logical operators: KafkaTopicSource and KafkaTopicSink. These are part of wayang-basic (the common operator library) rather than a separate platform module — they are executed on either the Java Streams or Apache Spark platform using mappings bundled in those modules. No separate wayang-kafka platform module is needed.
Using KafkaTopicSource
KafkaTopicSource reads string messages from a Kafka topic. It is a UnarySource<String>:
import org.apache.wayang.basic.operators.KafkaTopicSource;
import java.util.Properties;
// Constructor 1: topic name only (uses default encoding UTF-8)
KafkaTopicSource source = new KafkaTopicSource("my-topic");
// Constructor 2: explicit encoding
KafkaTopicSource source = new KafkaTopicSource("my-topic", "UTF-8");
Using KafkaTopicSink
KafkaTopicSink<T> writes all incoming data quanta to a Kafka topic, converting each record to a string:
import org.apache.wayang.basic.operators.KafkaTopicSink;
// Writes Object.toString() of each element to the topic
KafkaTopicSink<String> sink = new KafkaTopicSink<>("output-topic", String.class);
Kafka connection properties
Kafka connection credentials are supplied via a java.util.Properties object or via environment variables. The default property keys the client looks for:
bootstrap.servers = <your-broker>:9092
security.protocol = SASL_SSL
sasl.mechanism = PLAIN
sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule \
required username='<API_KEY>' password='<API_SECRET>';
key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer
key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms = 45000
acks = all
You can load a properties file instead of setting environment variables:
Properties kafkaProps = KafkaTopicSource.loadConfig("/path/to/kafka.properties");
KafkaTopicSource source = new KafkaTopicSource("events");
// getConsumer(Properties) creates and subscribes a KafkaConsumer using the provided properties
KafkaConsumer<String, String> consumer = source.getConsumer(kafkaProps);
The wayang-spark-kafka.properties reference file in conf/ shows the broker-side settings for a local Kafka cluster:
bootstrap.servers = localhost:9092
acks = all
retries = 0
batch.size = 16384
linger.ms = 1
buffer.memory = 33554432
key.serializer = org.apache.kafka.common.serialization.StringSerializer
value.serializer = org.apache.kafka.common.serialization.StringSerializer
Registering Kafka support
Because KafkaTopicSource and KafkaTopicSink are executed on the Java or Spark platform, you register those platforms as usual — no extra Kafka-specific plugin call is required:
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.basic.operators.KafkaTopicSource;
WayangContext wayang = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin()); // KafkaTopicSourceMapping is included here
new JavaPlanBuilder(wayang)
.withJobName("KafkaPipeline")
.withUdfJarOf(MyJob.class)
.load(new KafkaTopicSource("input-topic"))
.map(String::toUpperCase)
.load(new org.apache.wayang.basic.operators.KafkaTopicSink<>("output-topic", String.class));
KafkaTopicSource uses a cardinality fallback estimate of 1,000–100,000,000 records at 70% confidence because the actual message count in a topic is not knowable at plan time. The optimizer will treat this source as potentially large. Register Spark alongside Java if you expect high-volume Kafka topics.
Apache Giraph
The Apache Giraph platform adapter exposes graph-processing operators using Giraph’s bulk-synchronous parallel (BSP) model. It is primarily used for large-scale PageRank on graphs that are stored in HDFS.
Maven dependency
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-giraph</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Entry-point class: Giraph
import org.apache.wayang.giraph.Giraph;
// Graph operators: PageRank via Giraph BSP
Giraph.plugin()
// Direct platform reference
Giraph.platform()
Giraph.plugin() maps Wayang’s logical PageRankOperator to GiraphPageRankOperator, which submits the algorithm to a Giraph cluster.
Registration example
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import org.apache.wayang.giraph.Giraph;
WayangContext wayang = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin())
.withPlugin(Giraph.plugin()); // PageRank can now run on Giraph
TensorFlow
The TensorFlow platform adapter brings deep-learning operators into Wayang pipelines. It exposes a DLTrainingOperator for training a model and a PredictOperator for inference, both backed by the TensorFlow Java API.
Maven dependency
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-tensorflow</artifactId>
<version>WAYANG_VERSION</version>
</dependency>
Entry-point class: Tensorflow
import org.apache.wayang.tensorflow.Tensorflow;
// DL operators: CollectionSource, DLTrainingOperator, PredictOperator
Tensorflow.plugin()
// Channel-conversion rules (TensorChannel ↔ Java Collection)
Tensorflow.channelConversionPlugin()
// Direct platform reference
Tensorflow.platform()
Supported operators
The TensorFlow plugin maps three logical operators:
| Logical operator | TensorFlow implementation |
|---|
CollectionSource | CollectionSourceMapping |
DLTrainingOperator | DLTrainingOperatorMapping — trains a TF model |
PredictOperator | PredictMapping — runs inference on a trained model |
Registration example
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.java.Java;
import org.apache.wayang.tensorflow.Tensorflow;
WayangContext wayang = new WayangContext(new Configuration())
.withPlugin(Java.basicPlugin()) // pre/post-processing
.withPlugin(Tensorflow.plugin()) // DL training and inference
.withPlugin(Tensorflow.channelConversionPlugin()); // data transfer to/from TF
The TensorFlow platform communicates with TF via the TensorFlow Java API (org.tensorflow). Ensure that the TensorFlow native libraries are on the library path of the JVM running Wayang, or include the appropriate TF Java dependency with bundled natives.
| Platform | Module | Entry point | Key operators |
|---|
| PostgreSQL | wayang-postgres | Postgres.plugin() | TableSource, Filter, Projection, Join, GlobalReduce, TableSink |
| SQLite3 | wayang-sqlite3 | Sqlite3.plugin() | TableSource, Filter, Projection, TableSink |
| Apache Kafka | wayang-java / wayang-spark | KafkaTopicSource / KafkaTopicSink | Source and sink logical operators |
| Apache Giraph | wayang-giraph | Giraph.plugin() | PageRankOperator |
| TensorFlow | wayang-tensorflow | Tensorflow.plugin() | DLTrainingOperator, PredictOperator |