Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/flink/llms.txt
Use this file to discover all available pages before exploring further.
The Table API is a relational, declarative API for building both streaming and batch data pipelines. It provides a SQL-inspired fluent DSL in Java and Python, with the same query semantics whether you run on an infinite stream or a finite batch dataset.
What you’ll build
In this tutorial you will build a spend report that aggregates credit card transaction amounts by account ID and hour:
Transactions (DataGen source, 100 rows/sec) → hourly GROUP BY → Console sink
You’ll learn how to:
Create a TableEnvironment for streaming execution
Define a source table using TableDescriptor with the DataGen connector
Apply select, groupBy, and sum operations using the Table API
Write a User-Defined Function (UDF) to extend built-in functionality
Use tumbling windows for time-based aggregation
Test your streaming logic in batch mode
Prerequisites
Java 11, 17, or 21
Maven 3.8.6 or later
An IDE (IntelliJ IDEA recommended)
Java 11, 17, or 21
Python 3.9, 3.10, 3.11, or 3.12
PyFlink: pip install apache-flink
Steps
Create the project
Java (Maven archetype)
Python
Generate a skeleton project using the Flink Table API Maven archetype: mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-walkthrough-table-java \
-DarchetypeVersion=1.20.0 \
-DgroupId=spendreport \
-DartifactId=spendreport \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
Maven creates a spendreport/ directory with SpendReport.java, SpendReportTest.java, and a pom.xml containing the required Flink Table API dependencies. If you are running a SNAPSHOT version of Flink, use the corresponding snapshot version in -DarchetypeVersion.
Create a new file spend_report.py. PyFlink provides all required dependencies via the apache-flink package. python -m pip install apache-flink
touch spend_report.py
Create the TableEnvironment
The TableEnvironment is the entry point for all Table API and SQL operations. You configure it with EnvironmentSettings to choose between streaming and batch execution. import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings . inStreamingMode ();
TableEnvironment tEnv = TableEnvironment . create (settings);
from pyflink.table import TableEnvironment, EnvironmentSettings
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)
t_env.get_config().set( "parallelism.default" , "1" )
Define the source table
Register a source table backed by the DataGen connector. DataGen generates random rows at a configurable rate — no external system is needed. The transactions table has three columns:
accountId — a random BIGINT between 1 and 5
amount — a random BIGINT between 1 and 1000
transactionTime — a TIMESTAMP(3) with a watermark for event-time processing
import org.apache.flink.table.api. * ;
tEnv . createTemporaryTable ( "transactions" ,
TableDescriptor . forConnector ( "datagen" )
. schema ( Schema . newBuilder ()
. column ( "accountId" , DataTypes . BIGINT ())
. column ( "amount" , DataTypes . BIGINT ())
. column ( "transactionTime" , DataTypes . TIMESTAMP ( 3 ))
. watermark ( "transactionTime" , "transactionTime - INTERVAL '5' SECOND" )
. build ())
. option ( "rows-per-second" , "100" )
. option ( "fields.accountId.min" , "1" )
. option ( "fields.accountId.max" , "5" )
. option ( "fields.amount.min" , "1" )
. option ( "fields.amount.max" , "1000" )
. build ());
The watermark declaration transactionTime - INTERVAL '5' SECOND tells Flink that events may arrive up to 5 seconds late. Flink uses this to determine when to close event-time windows. from pyflink.table import TableDescriptor, Schema, DataTypes
t_env.create_temporary_table(
"transactions" ,
TableDescriptor.for_connector( "datagen" )
.schema(Schema.new_builder()
.column( "accountId" , DataTypes.BIGINT())
.column( "amount" , DataTypes.BIGINT())
.column( "transactionTime" , DataTypes.TIMESTAMP( 3 ))
.watermark( "transactionTime" , "transactionTime - INTERVAL '5' SECOND" )
.build())
.option( "rows-per-second" , "100" )
.option( "fields.accountId.min" , "1" )
.option( "fields.accountId.max" , "5" )
.option( "fields.amount.min" , "1" )
.option( "fields.amount.max" , "1000" )
.build())
Write the report query
The report groups transactions by accountId and by the hour in which they occurred, then sums the transaction amounts. The built-in floor function rounds a timestamp down to the nearest hour boundary. import org.apache.flink.table.api.Table;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import static org.apache.flink.table.api.Expressions.$;
public static Table report ( Table transactions) {
return transactions
. select (
$ ( "accountId" ),
$ ( "transactionTime" ). floor ( TimeIntervalUnit . HOUR ). as ( "logTs" ),
$ ( "amount" ))
. groupBy ( $ ( "accountId" ), $ ( "logTs" ))
. select (
$ ( "accountId" ),
$ ( "logTs" ),
$ ( "amount" ). sum (). as ( "amount" ));
}
Call it from main(): Table transactions = tEnv . from ( "transactions" );
Table result = report (transactions);
result . execute (). print ();
from pyflink.table.expression import TimeIntervalUnit
from pyflink.table.expressions import col
def report ( transactions ):
return transactions \
.select(
col( "accountId" ),
col( "transactionTime" ).floor(TimeIntervalUnit. HOUR ).alias( "logTs" ),
col( "amount" )) \
.group_by(col( "accountId" ), col( "logTs" )) \
.select(
col( "accountId" ),
col( "logTs" ),
col( "amount" ).sum.alias( "amount" ))
Call it from main(): transactions = t_env.from_path( "transactions" )
result = report(transactions)
result.execute().print()
Test in batch mode
One of Flink’s key properties is that streaming and batch programs share the same semantics. You can test your report() logic with a fixed, finite dataset in batch mode and then deploy it unchanged against a live stream. import org.apache.flink.table.api. * ;
import org.apache.flink.types.Row;
import java.time.LocalDateTime;
EnvironmentSettings settings = EnvironmentSettings . inBatchMode ();
TableEnvironment tEnv = TableEnvironment . create (settings);
Table transactions = tEnv . fromValues (
DataTypes . ROW (
DataTypes . FIELD ( "accountId" , DataTypes . BIGINT ()),
DataTypes . FIELD ( "amount" , DataTypes . BIGINT ()),
DataTypes . FIELD ( "transactionTime" , DataTypes . TIMESTAMP ( 3 ))
),
Row . of ( 1L , 188L , LocalDateTime . of ( 2024 , 1 , 1 , 9 , 0 , 0 )),
Row . of ( 1L , 374L , LocalDateTime . of ( 2024 , 1 , 1 , 9 , 30 , 0 )),
Row . of ( 2L , 200L , LocalDateTime . of ( 2024 , 1 , 1 , 9 , 15 , 0 )),
Row . of ( 1L , 600L , LocalDateTime . of ( 2024 , 1 , 1 , 10 , 0 , 0 ))
);
Table result = SpendReport . report (transactions);
// account 1 at 9am: 188 + 374 = 562
// account 2 at 9am: 200
// account 1 at 10am: 600
result . execute (). print ();
from datetime import datetime
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes
from spend_report import report
def test_report ():
settings = EnvironmentSettings.in_batch_mode()
t_env = TableEnvironment.create(settings)
transactions = t_env.from_elements(
[
( 1 , 188 , datetime( 2024 , 1 , 1 , 9 , 0 , 0 )),
( 1 , 374 , datetime( 2024 , 1 , 1 , 9 , 30 , 0 )),
( 2 , 200 , datetime( 2024 , 1 , 1 , 9 , 15 , 0 )),
( 1 , 600 , datetime( 2024 , 1 , 1 , 10 , 0 , 0 )),
],
DataTypes.ROW([
DataTypes.FIELD( "accountId" , DataTypes.BIGINT()),
DataTypes.FIELD( "amount" , DataTypes.BIGINT()),
DataTypes.FIELD( "transactionTime" , DataTypes.TIMESTAMP( 3 ))
])
)
result = report(transactions)
rows = [row for row in result.execute().collect()]
# account 1 at 9am: 562, account 2 at 9am: 200, account 1 at 10am: 600
assert len (rows) == 3
print ( "All tests passed!" )
if __name__ == '__main__' :
test_report()
(Optional) Write a User-Defined Function
Flink has many built-in functions, but you can extend the Table API with custom scalar functions (UDFs) when needed. Here is how to implement the floor-to-hour logic as a UDF: import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.functions.ScalarFunction;
public class MyFloor extends ScalarFunction {
public @ DataTypeHint ( "TIMESTAMP(3)" ) LocalDateTime eval (
@ DataTypeHint ( "TIMESTAMP(3)" ) LocalDateTime timestamp ) {
return timestamp . truncatedTo ( ChronoUnit . HOURS );
}
}
Use the UDF in your report: import static org.apache.flink.table.api.Expressions.call;
public static Table report ( Table transactions) {
return transactions
. select (
$ ( "accountId" ),
call ( MyFloor . class , $ ( "transactionTime" )). as ( "logTs" ),
$ ( "amount" ))
. groupBy ( $ ( "accountId" ), $ ( "logTs" ))
. select (
$ ( "accountId" ),
$ ( "logTs" ),
$ ( "amount" ). sum (). as ( "amount" ));
}
from pyflink.table.udf import udf
from pyflink.table import DataTypes
from datetime import datetime
@udf ( result_type = DataTypes.TIMESTAMP( 3 ))
def my_floor ( timestamp : datetime) -> datetime:
return timestamp.replace( minute = 0 , second = 0 , microsecond = 0 )
Use the UDF in your report: def report ( transactions ):
return transactions \
.select(
col( "accountId" ),
my_floor(col( "transactionTime" )).alias( "logTs" ),
col( "amount" )) \
.group_by(col( "accountId" ), col( "logTs" )) \
.select(
col( "accountId" ),
col( "logTs" ),
col( "amount" ).sum.alias( "amount" ))
(Optional) Use tumbling windows
For finer-grained time bucketing, replace the floor call with a tumbling window. Windows are first-class citizens in Flink’s runtime and enable additional optimizations compared to a manual floor approach. import org.apache.flink.table.api.Tumble;
import static org.apache.flink.table.api.Expressions.lit;
public static Table report ( Table transactions) {
return transactions
. window ( Tumble . over ( lit ( 10 ). seconds ()). on ( $ ( "transactionTime" )). as ( "logTs" ))
. groupBy ( $ ( "accountId" ), $ ( "logTs" ))
. select (
$ ( "accountId" ),
$ ( "logTs" ). start (). as ( "logTs" ),
$ ( "amount" ). sum (). as ( "amount" ));
}
This creates 10-second tumbling windows. A transaction with transactionTime = 2024-01-01 01:23:47 is placed in the 2024-01-01 01:23:40 window. from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble
def report ( transactions ):
return transactions \
.window(Tumble.over(lit( 10 ).seconds).on(col( "transactionTime" )).alias( "logTs" )) \
.group_by(col( "accountId" ), col( "logTs" )) \
.select(
col( "accountId" ),
col( "logTs" ).start.alias( "logTs" ),
col( "amount" ).sum.alias( "amount" ))
Run the application
Run the SpendReport class from your IDE. Results are printed to the console as they are computed. Since the DataGen source is unbounded, the job runs continuously until you stop it.
The output shows running totals per account per hour, updating as new transactions arrive: +----+----------------------+-------------------------+----------------------+
| op | accountId | logTs | amount |
+----+----------------------+-------------------------+----------------------+
| +I | 3 | 2024-01-01 09:00:00.000 | 4821 |
| +I | 1 | 2024-01-01 09:00:00.000 | 5103 |
| -U | 3 | 2024-01-01 09:00:00.000 | 4821 |
| +U | 3 | 2024-01-01 09:00:00.000 | 9644 |
The +I (insert), -U (retract), and +U (upsert) row kinds reflect that the aggregation result is continuously updated.
Complete programs
SpendReport.java
spend_report.py
import org.apache.flink.table.api. * ;
import org.apache.flink.table.expressions.TimeIntervalUnit;
import static org.apache.flink.table.api.Expressions.$;
public class SpendReport {
public static void main ( String [] args ) throws Exception {
EnvironmentSettings settings = EnvironmentSettings . inStreamingMode ();
TableEnvironment tEnv = TableEnvironment . create (settings);
tEnv . createTemporaryTable ( "transactions" ,
TableDescriptor . forConnector ( "datagen" )
. schema ( Schema . newBuilder ()
. column ( "accountId" , DataTypes . BIGINT ())
. column ( "amount" , DataTypes . BIGINT ())
. column ( "transactionTime" , DataTypes . TIMESTAMP ( 3 ))
. watermark ( "transactionTime" , "transactionTime - INTERVAL '5' SECOND" )
. build ())
. option ( "rows-per-second" , "100" )
. option ( "fields.accountId.min" , "1" )
. option ( "fields.accountId.max" , "5" )
. option ( "fields.amount.min" , "1" )
. option ( "fields.amount.max" , "1000" )
. build ());
Table transactions = tEnv . from ( "transactions" );
Table result = report (transactions);
result . execute (). print ();
}
public static Table report ( Table transactions ) {
return transactions
. select (
$ ( "accountId" ),
$ ( "transactionTime" ). floor ( TimeIntervalUnit . HOUR ). as ( "logTs" ),
$ ( "amount" ))
. groupBy ( $ ( "accountId" ), $ ( "logTs" ))
. select (
$ ( "accountId" ),
$ ( "logTs" ),
$ ( "amount" ). sum (). as ( "amount" ));
}
}
What’s next
Table API reference — Explore all operations, window types, and join semantics in the Table API documentation .
User-Defined Functions — Learn how to write scalar, table, and aggregate UDFs in the UDF guide .
SQL Quickstart — Run the same kind of aggregation query interactively from the SQL Client .
DataStream API Quickstart — Build a stateful fraud detection system with the lower-level DataStream API .