Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/wayang/llms.txt

Use this file to discover all available pages before exploring further.

Apache Wayang’s SQL API lets you write standard SQL and have Wayang execute it on any combination of registered platforms — local Java Streams, Apache Spark, PostgreSQL, and others. Under the hood, the SQL string is parsed by Apache Calcite, validated and converted into a relational algebra tree, translated into Wayang operators by WayangRelConverter, and handed to the platform-agnostic cost-based optimizer. The pipeline from SQL text to execution is entirely transparent; you write SQL, Wayang handles the rest.

Maven dependency

Add the SQL API module alongside your platform dependencies:
pom.xml
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-api-sql</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>

<!-- Execution platforms -->
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-java</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-spark_2.12</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>
<!-- Optional: for database pushdown -->
<dependency>
  <groupId>org.apache.wayang</groupId>
  <artifactId>wayang-postgres</artifactId>
  <version>WAYANG_VERSION</version>
</dependency>

SqlContext

SqlContext is the entry point for the SQL API. It extends WayangContext and adds the Apache Calcite schema registry and the executeSql method.
import org.apache.wayang.api.sql.context.SqlContext;
import org.apache.wayang.core.api.Configuration;
import java.sql.SQLException;

Configuration configuration = new Configuration();
try {
    SqlContext sqlContext = new SqlContext(configuration);
} catch (SQLException e) {
    throw new RuntimeException(e);
}
SqlContext automatically registers the Java, Spark, and Postgres plugins when constructed with only a Configuration. To control which plugins are active, use the two-argument constructor:
import org.apache.wayang.core.plugin.Plugin;
import org.apache.wayang.java.Java;
import org.apache.wayang.spark.Spark;
import java.sql.SQLException;
import java.util.List;

List<Plugin> plugins = List.of(
    Java.channelConversionPlugin(),
    Postgres.conversionPlugin()
);

try {
    SqlContext sqlContext = new SqlContext(configuration, plugins);
    // Register additional plugins as needed.
    sqlContext.withPlugin(Java.basicPlugin());
    sqlContext.withPlugin(Spark.basicPlugin());
} catch (SQLException e) {
    throw new RuntimeException(e);
}

SqlContext constructors

ConstructorDescription
new SqlContext()Default configuration, auto-registers Java + Spark + Postgres. Throws SQLException.
new SqlContext(Configuration configuration)Custom configuration, auto-registers Java + Spark + Postgres. Throws SQLException.
new SqlContext(Configuration configuration, List<Plugin> plugins)Custom configuration, explicit plugin list. Throws SQLException.

Schema registration

Wayang’s SQL API uses Apache Calcite schemas to map table names in SQL to physical data sources. You configure the schema by setting the wayang.calcite.model property in your Configuration to an inline Calcite model JSON string.

File-backed schema (CSV / Parquet)

The Calcite file adapter (org.apache.calcite.adapter.file.FileSchemaFactory) maps all CSV files in a directory as tables whose names match their filenames.
// Build the inline Calcite model JSON.
String calciteModel = "{\n"
    + "  \"calcite\": {\n"
    + "    \"version\": \"1.0\",\n"
    + "    \"defaultSchema\": \"fs\",\n"
    + "    \"schemas\": [\n"
    + "      {\n"
    + "        \"name\": \"fs\",\n"
    + "        \"type\": \"custom\",\n"
    + "        \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\n"
    + "        \"operand\": {\n"
    + "          \"directory\": \"/data/csv-tables\"\n"
    + "        }\n"
    + "      }\n"
    + "    ]\n"
    + "  }\n"
    + "}";

Configuration configuration = new Configuration();
configuration.setProperty("wayang.calcite.model", calciteModel);

SqlContext sqlContext = new SqlContext(configuration);
With this schema, a file /data/csv-tables/orders.csv is addressable as fs.orders in SQL.

JDBC / PostgreSQL schema

String calciteModel = "{\n"
    + "  \"calcite\": {\n"
    + "    \"version\": \"1.0\",\n"
    + "    \"defaultSchema\": \"postgres\",\n"
    + "    \"schemas\": [\n"
    + "      {\n"
    + "        \"name\": \"postgres\",\n"
    + "        \"type\": \"custom\",\n"
    + "        \"factory\": \"org.apache.wayang.api.sql.calcite.jdbc.JdbcSchema$Factory\",\n"
    + "        \"operand\": {\n"
    + "          \"jdbcDriver\":   \"org.postgresql.Driver\",\n"
    + "          \"jdbcUrl\":      \"jdbc:postgresql://localhost:5432/mydb\",\n"
    + "          \"jdbcUser\":     \"wayang\",\n"
    + "          \"jdbcPassword\": \"secret\"\n"
    + "        }\n"
    + "      }\n"
    + "    ]\n"
    + "  }\n"
    + "}";

configuration.setProperty("wayang.calcite.model", calciteModel);
The Calcite schema drives validation and planning. The actual data reading at runtime goes through the registered Wayang platform operators, not the JDBC connection used by the schema introspection.

Executing SQL

executeSql

public Collection<Record> executeSql(String sql) throws SqlParseException
Parses, plans, optimises, and executes the SQL string, returning all result rows as a Collection<Record>. Every Record holds an Object[] of field values accessible by position or name.
Collection<Record> results = sqlContext.executeSql(
    "SELECT name, COUNT(*) AS cnt " +
    "FROM fs.orders " +
    "GROUP BY name " +
    "ORDER BY cnt DESC"
);

for (Record record : results) {
    System.out.println(record.getField(0) + " → " + record.getField(1));
}

Command-line execution

SqlContext has a main method for running SQL files from the command line:
./bin/wayang-submit org.apache.wayang.api.sql.SqlContext \
  -c /path/to/wayang.properties \
  -q /path/to/query.sql \
  -o /path/to/output.txt \
  -p java,spark
FlagDescription
-c, --configPath to Wayang configuration properties file.
-q, --queryPath to a file containing the SQL statement (no trailing semicolon).
-o, --outputPathPath where result records are written, one per line.
-p, --platformsComma-separated platform keys (e.g., java,spark).

Supported SQL operations

The WayangRelConverter translates Apache Calcite relational nodes into Wayang operators. The following SQL constructs are supported.

SELECT and projection

SELECT name, revenue, quantity
FROM fs.sales
Translated to a MapOperator (projection). All columns: SELECT * is also supported.

WHERE (filter)

SELECT * FROM fs.orders WHERE status = 'shipped' AND amount > 100
Supported filter operators:
SQL operatorSupported
=, <>, !=
<, >, <=, >=
AND, OR, NOT
IS NULL, IS NOT NULL
CAST(col AS type)
LIKE, BETWEEN
Translated to a FilterOperator with a FilterPredicateImpl that evaluates the Calcite RexNode expression tree.

JOIN

SELECT u.name, o.total
FROM fs.users u
JOIN fs.orders o ON u.id = o.user_id
Single-condition equi-joins are translated to JoinOperator. Multi-condition joins (e.g., ON a.x = b.x AND a.y = b.y) use WayangMultiConditionJoinVisitor, which chains two key extractors. Cross joins (ON TRUE / no condition) use WayangCrossJoinVisitor.

GROUP BY and aggregates

SELECT department, COUNT(*), SUM(salary), AVG(salary), MIN(salary), MAX(salary)
FROM fs.employees
GROUP BY department
Supported aggregate functions:
FunctionSupported
COUNT(*) / COUNT(col)
SUM(col)
MIN(col)
MAX(col)
AVG(col)
GROUP BY is translated to ReduceByOperator. A bare aggregate with no GROUP BY (e.g., SELECT COUNT(*) FROM t) is translated to GlobalReduceOperator.

ORDER BY

SELECT name, score FROM fs.results ORDER BY score DESC
Translated to SortOperator. Direction (ASC/DESC) and multiple sort keys are supported.

Subquery removal

The optimizer applies Calcite’s SubQueryRemoveRule to rewrite IN, EXISTS, and scalar subqueries as joins before planning, so correlated subqueries in WHERE clauses are normalised automatically.

How SQL maps to Wayang operators

The pipeline from SQL text to execution goes through three stages:
1

Parse and validate

Optimizer.parseSql(sql) feeds the SQL string through Calcite’s SqlParser and SqlValidator. The result is a validated SqlNode AST. Syntax errors or unknown table/column references surface here as SqlParseException.
2

Convert to relational algebra

Optimizer.convert(validatedSqlNode) converts the AST to Calcite’s RelNode tree using a SqlToRelConverter. This is the standard Calcite logical plan.
3

Apply Wayang rules and translate

Optimizer.optimize(relNode, ...) applies the WayangRules rule set to convert each standard Calcite RelNode into a Wayang-convention counterpart (WayangTableScan, WayangFilter, WayangProject, WayangJoin, WayangAggregate, WayangSort). WayangRelConverter.convert() then recursively walks the Wayang-convention tree and instantiates the corresponding Wayang operator objects, which are wired together into a WayangPlan and submitted to the cross-platform executor.
SQL string

    ▼  Calcite SqlParser + SqlValidator
SqlNode (AST)

    ▼  SqlToRelConverter
RelNode (logical relational algebra)

    ▼  WayangRules (Volcano planner)
WayangRelNode tree

    ▼  WayangRelConverter
Wayang operator DAG

    ▼  Wayang cost-based optimizer
Execution on Java / Spark / Postgres / ...

Complete example: sales analytics

This example reads order data from a CSV file, filters, groups, and sorts it, then collects results.
import org.apache.wayang.api.sql.context.SqlContext;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.core.api.Configuration;
import java.util.Collection;

public class SalesAnalytics {

    public static void main(String[] args) throws Exception {

        // 1. Point the Calcite model at the directory of CSV files.
        String calciteModel = "{\n"
            + "  \"calcite\": {\n"
            + "    \"version\": \"1.0\",\n"
            + "    \"defaultSchema\": \"fs\",\n"
            + "    \"schemas\": [\n"
            + "      {\n"
            + "        \"name\": \"fs\",\n"
            + "        \"type\": \"custom\",\n"
            + "        \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\n"
            + "        \"operand\": { \"directory\": \"/data/csv\" }\n"
            + "      }\n"
            + "    ]\n"
            + "  }\n"
            + "}";

        Configuration configuration = new Configuration();
        configuration.setProperty("wayang.calcite.model", calciteModel);

        // 2. Create the SQL context (auto-registers Java + Spark + Postgres).
        SqlContext ctx = new SqlContext(configuration);

        // 3. Run SQL directly — Wayang handles planning and execution.
        Collection<Record> results = ctx.executeSql(
            "SELECT region, SUM(amount) AS total_sales, COUNT(*) AS num_orders " +
            "FROM fs.orders " +
            "WHERE status = 'completed' " +
            "GROUP BY region " +
            "ORDER BY total_sales DESC"
        );

        // 4. Print results — each Record is an Object[] row.
        System.out.println("region\ttotal_sales\tnum_orders");
        for (Record r : results) {
            System.out.printf("%s\t%s\t%s%n",
                r.getField(0), r.getField(1), r.getField(2));
        }
    }
}

Accessing Record fields

A Record wraps an Object[] of field values. Access fields by position:
Record r = results.iterator().next();

// By zero-based index
Object  field0  = r.getField(0);
int     intVal  = r.getInt(1);
double  dblVal  = r.getDouble(2);
long    longVal = r.getLong(3);
int     size    = r.size();          // number of fields
Object[] values = r.getValues();     // raw array

Working with PostgreSQL pushdown

When wayang-postgres is on the classpath and the Calcite schema points to a PostgreSQL database, Wayang can push SELECT, WHERE, GROUP BY, and ORDER BY clauses directly into the database, retrieving only the aggregated results. Register both the conversion plugin and the basic plugin:
import org.apache.wayang.postgres.Postgres;
import java.sql.SQLException;

try {
    SqlContext ctx = new SqlContext(configuration, List.of(
        Java.channelConversionPlugin(),
        Postgres.conversionPlugin()
    ));
    ctx.withPlugin(Java.basicPlugin());
    ctx.withPlugin(Postgres.plugin());

    Collection<Record> results = ctx.executeSql(
        "SELECT customer_id, SUM(amount) " +
        "FROM postgres.transactions " +
        "GROUP BY customer_id"
    );
} catch (SQLException e) {
    throw new RuntimeException(e);
}
The Wayang cost model will evaluate whether it is cheaper to pull the data to Java/Spark or to push the aggregation down to PostgreSQL, and will choose accordingly.

Known limitations

The SQL API is under active development. The following constructs are not yet supported:
  • Window functions (OVER, PARTITION BY, ROW_NUMBER())
  • UNION, INTERSECT, EXCEPT set operations at the SQL level
  • HAVING clauses (workaround: wrap in a subquery or post-filter in Java)
  • DISTINCT inside aggregate functions (e.g., COUNT(DISTINCT col))
  • Recursive CTEs (WITH RECURSIVE)
  • Non-equi joins expressed directly in SQL (they will throw IllegalStateException)
For use cases requiring these features, use the Java API or Scala API directly.
All result rows are returned as Record objects regardless of platform. Columns are ordered as they appear in the SELECT clause. Use SELECT with explicit column names rather than SELECT * to ensure a stable column order across schema evolution.

Build docs developers (and LLMs) love