Documentation Index Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt
Use this file to discover all available pages before exploring further.
Load data from any SQL database supported by SQLAlchemy. The sql_database source automatically reflects table schemas, handles incremental loading, and supports multiple backends.
Quick Start
Load specific tables from a MySQL database:
import dlt
from dlt.sources.sql_database import sql_database
from dlt.sources.credentials import ConnectionStringCredentials
pipeline = dlt.pipeline(
pipeline_name = "rfam" ,
destination = "duckdb" ,
dataset_name = "rfam_data" ,
)
credentials = ConnectionStringCredentials(
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
)
# Load specific tables
source = sql_database(credentials).with_resources( "family" , "clan" )
load_info = pipeline.run(source, write_disposition = "replace" )
print (load_info)
Loading Strategies
Select Specific Tables
Choose which tables to load: # Load only specific tables
source = sql_database(credentials).with_resources(
"family" ,
"clan" ,
"genome"
)
pipeline.run(source)
Load Entire Database
Reflect and load all tables: # Load all tables from the database
source = sql_database(credentials)
pipeline.run(source, write_disposition = "replace" )
For large databases, this may take a while. Consider loading specific tables first.
Configure Incremental Loading
Add incremental loading to track changes: source = sql_database(credentials).with_resources( "family" , "clan" )
# Add incremental config
source.family.apply_hints(
incremental = dlt.sources.incremental( "updated" )
)
source.clan.apply_hints(
incremental = dlt.sources.incremental( "updated" )
)
# Merge updates into existing data
pipeline.run(source, write_disposition = "merge" )
Using sql_table for Single Tables
Load individual tables with fine-grained control:
from dlt.sources.sql_database import sql_table
pipeline = dlt.pipeline(
pipeline_name = "rfam_database" ,
destination = "duckdb" ,
dataset_name = "rfam_data" ,
)
# Load a single table with incremental loading
family = sql_table(
credentials = ConnectionStringCredentials(
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
),
table = "family" ,
incremental = dlt.sources.incremental( "updated" ),
reflection_level = "full_with_precision" ,
defer_table_reflect = True ,
)
# Load another table
genome = sql_table(
credentials = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ,
table = "genome" ,
reflection_level = "full_with_precision" ,
)
# Load both tables together
pipeline.run([family, genome], write_disposition = "merge" )
Incremental Loading Patterns
Merge (Upsert)
Append (Insert Only)
Replace (Full Refresh)
# For tables with updates to existing rows
source = sql_database(credentials).with_resources( "family" , "clan" )
source.family.apply_hints(
incremental = dlt.sources.incremental( "updated" )
)
source.clan.apply_hints(
incremental = dlt.sources.incremental( "updated" )
)
pipeline.run(source, write_disposition = "merge" )
Date Range Loading
Load data within a specific date range:
from dlt.common import pendulum
start_date = pendulum.now().subtract( years = 1 )
end_date = pendulum.now()
family = sql_table(
credentials = "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ,
table = "family" ,
incremental = dlt.sources.incremental(
"updated" ,
initial_value = start_date,
end_value = end_date,
row_order = "desc" , # Rows are ordered descending
),
chunk_size = 10 ,
)
# Load only one chunk (10 records)
pipeline.run(family.add_limit( 1 ))
Backend Options
PyArrow Backend (Fast)
Use PyArrow for better performance:
import sqlalchemy as sa
def _double_as_decimal_adapter ( table : sa.Table) -> sa.Table:
"""Customize type handling"""
for column in table.columns.values():
if hasattr (sa, "Double" ) and isinstance (column.type, sa.Double):
column.type.asdecimal = False
return table
source = sql_database(
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ,
backend = "pyarrow" ,
table_adapter_callback = _double_as_decimal_adapter,
).with_resources( "family" , "genome" )
pipeline.run(source)
Pandas Backend
Use Pandas for specific data processing needs:
source = sql_database(
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ,
backend = "pandas" ,
chunk_size = 100000 ,
backend_kwargs = {
"coerce_float" : False ,
"dtype_backend" : "numpy_nullable"
},
reflection_level = "full_with_precision" ,
).with_resources( "family" , "genome" )
pipeline.run(source)
ConnectorX Backend (Ultra Fast)
For maximum performance on large datasets:
unsw_table = sql_table(
"postgresql://loader:loader@localhost:5432/dlt_data" ,
"unsw_flow_7" ,
"speed_test" ,
backend = "connectorx" ,
reflection_level = "full_with_precision" ,
backend_kwargs = {
"conn" : "postgresql://loader:loader@localhost:5432/dlt_data"
},
)
pipeline = dlt.pipeline(
pipeline_name = "unsw_download" ,
destination = "filesystem" ,
progress = "log" ,
)
pipeline.run(
unsw_table,
dataset_name = "speed_test" ,
loader_file_format = "parquet" ,
)
Column Selection
Load only specific columns from tables:
import os
# Specify columns via environment variable
os.environ[ "SOURCES__SQL_DATABASE__FAMILY__INCLUDED_COLUMNS" ] = '["rfam_acc", "description"]'
source = sql_database(
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam" ,
backend = "pyarrow" ,
reflection_level = "full_with_precision" ,
).with_resources( "family" , "genome" )
pipeline.run(source)
Or in .dlt/config.toml:
[ sources . sql_database . family ]
included_columns = [ "rfam_acc" , "description" ]
Type Adapters
Customize how SQL types are converted:
import sqlalchemy as sa
def type_adapter ( sql_type ):
"""Convert SQL arrays to JSON"""
if isinstance (sql_type, sa. ARRAY ):
return sa.JSON()
return sql_type
source = sql_database(
"postgresql://loader:loader@localhost:5432/dlt_data" ,
backend = "pyarrow" ,
type_adapter_callback = type_adapter,
reflection_level = "full_with_precision" ,
).with_resources( "table_with_array_column" )
pipeline.run(source)
Complete Example: Mixed Load
import dlt
from dlt.sources.sql_database import sql_database
from dlt.sources.credentials import ConnectionStringCredentials
def load_database ():
pipeline = dlt.pipeline(
pipeline_name = "rfam" ,
destination = "duckdb" ,
dataset_name = "rfam_data"
)
credentials = ConnectionStringCredentials(
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
)
# Load incrementally with merge
source_1 = sql_database(credentials).with_resources( "family" , "clan" )
source_1.family.apply_hints( incremental = dlt.sources.incremental( "updated" ))
source_1.clan.apply_hints( incremental = dlt.sources.incremental( "updated" ))
pipeline.run(source_1, write_disposition = "merge" )
# Load with replace
source_2 = sql_database(credentials).with_resources( "features" , "author" )
pipeline.run(source_2, write_disposition = "replace" )
# Load append-only table
source_3 = sql_database(credentials).with_resources( "genome" )
source_3.genome.apply_hints( incremental = dlt.sources.incremental( "created" ))
pipeline.run(source_3, write_disposition = "append" )
if __name__ == "__main__" :
load_database()
Next Steps
Incremental Loading Configure incremental loading strategies
Schema Evolution Handle schema changes over time