Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt

Use this file to discover all available pages before exploring further.

Incremental loading enables you to load only new or changed data instead of reloading everything. This reduces costs, improves performance, and enables real-time data pipelines.

Write Dispositions

dlt supports three write dispositions that control how data is loaded:
1

Replace (Full Load)

Replaces all data in the destination with new data from the source.
pipeline.run(source(), write_disposition="replace")
Use for small datasets or when you need a complete refresh.
2

Append (Insert Only)

Adds new data to the destination without modifying existing records.
pipeline.run(source(), write_disposition="append")
Use for immutable event data like logs, clicks, or transactions.
3

Merge (Upsert)

Updates existing records and inserts new ones based on primary/merge keys.
@dlt.resource(primary_key="id", write_disposition="merge")
def users():
    yield [{"id": 1, "name": "Alice", "status": "active"}]
Use for stateful data that changes over time like user profiles or product catalogs.

Cursor-Based Incremental Loading

Track changes using a cursor field (timestamp, ID, version number):
import dlt
from dlt.sources.helpers import requests

@dlt.resource(primary_key="id")
def repo_issues(
    repository: str,
    updated_at=dlt.sources.incremental(
        "updated_at",
        initial_value="1970-01-01T00:00:00Z"
    )
):
    """Load GitHub issues incrementally"""
    url = f"https://api.github.com/repos/{repository}/issues"
    
    # Use start_value to get only new/updated issues
    response = requests.get(url, params={
        "since": updated_at.start_value,
        "state": "all",
        "per_page": 100,
    })
    response.raise_for_status()
    
    yield response.json()
    
    # last_value is automatically tracked
    print(f"Latest update: {updated_at.last_value}")

pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
    dataset_name="github_data",
)

# First run: loads all issues since 1970
load_info = pipeline.run(repo_issues("dlt-hub/dlt"))

# Second run: loads only new/updated issues
load_info = pipeline.run(repo_issues("dlt-hub/dlt"))

How It Works

# The incremental instance provides:

# initial_value: Starting point (never changes)
updated_at.initial_value  # "1970-01-01T00:00:00Z"

# start_value: Max value from previous run or initial_value
updated_at.start_value  # "2024-01-01T10:30:00Z"

# last_value: Real-time max value (updates with each item)
updated_at.last_value  # "2024-01-15T14:22:00Z"

# end_value: Optional end point for backfills
updated_at.end_value  # "2024-02-01T00:00:00Z"

Automatic Deduplication

When an API doesn’t support filtering, dlt handles it:
@dlt.resource(
    primary_key="id",
    table_name=lambda i: i["type"]  # Dynamic table names
)
def repo_events(
    repository: str,
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="1970-01-01T00:00:00Z",
        row_order="desc"  # Optimize: stop when out of range
    )
):
    """GitHub events API returns all events - dlt filters them"""
    url = f"https://api.github.com/repos/{repository}/events"
    
    # API doesn't support 'since' parameter
    response = requests.get(url, params={"per_page": 100})
    response.raise_for_status()
    
    # Yield all data - dlt automatically:
    # 1. Filters out old events using created_at
    # 2. Deduplicates using primary_key (id)
    # 3. Stops pagination when row_order="desc" and out of range
    yield response.json()

Custom Last Value Function

Use custom logic to determine the “last” value:
# Track max value per table type
def by_event_type(event):
    if len(event) == 1:
        item, = event
        last_value = {}
    else:
        item, last_value = event
        last_value = dict(last_value)
    
    item_type = item["type"]
    last_value[item_type] = max(
        item["created_at"],
        last_value.get(item_type, "1970-01-01T00:00:00Z")
    )
    return last_value

@dlt.resource(
    primary_key="id",
    table_name=lambda i: i["type"]
)
def events(
    created_at=dlt.sources.incremental(
        "$",  # Full item as cursor
        last_value_func=by_event_type
    )
):
    # Load data...
    pass

Backfill with Date Ranges

Load historical data in ranges without affecting incremental state:
@dlt.resource(primary_key="id")
def repo_issues(
    repository: str,
    updated_at=dlt.sources.incremental(
        "updated_at",
        initial_value="1970-01-01T00:00:00Z"
    )
):
    url = f"https://api.github.com/repos/{repository}/issues"
    
    response = requests.get(url, params={
        "since": updated_at.start_value,
        "until": updated_at.end_value,  # Optional end date
        "state": "all",
    })
    response.raise_for_status()
    yield response.json()

# Normal incremental load
pipeline.run(repo_issues("dlt-hub/dlt"))

# Backfill specific ranges (stateless)
july_issues = repo_issues(
    "dlt-hub/dlt",
    updated_at=dlt.sources.incremental(
        initial_value="2024-07-01T00:00:00Z",
        end_value="2024-08-01T00:00:00Z"
    )
)

august_issues = repo_issues(
    "dlt-hub/dlt",
    updated_at=dlt.sources.incremental(
        initial_value="2024-08-01T00:00:00Z",
        end_value="2024-09-01T00:00:00Z"
    )
)

# Run backfills in parallel (doesn't affect state)
pipeline.run([july_issues, august_issues])
Backfills with end_value are stateless and can run in parallel without affecting the main incremental state.

Lag/Attribution Window

Handle late-arriving data by reprocessing recent records:
@dlt.resource(primary_key="id")
def events(
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="2024-01-01T00:00:00Z",
        lag=3600,  # Reprocess last hour (in seconds)
    )
):
    """Reprocess last hour of data to catch late arrivals"""
    # If last_value was 2024-01-15 14:00:00
    # start_value will be 2024-01-15 13:00:00 (1 hour earlier)
    yield fetch_events(since=created_at.start_value)

Row Order Optimization

Stop fetching data early when rows are ordered:
@dlt.resource(primary_key="id")
def events(
    created_at=dlt.sources.incremental(
        "created_at",
        initial_value="2024-01-01T00:00:00Z",
        row_order="desc",  # Newest first
    )
):
    """API returns newest events first"""
    # dlt stops iterating when created_at < start_value
    # Saves API calls and processing time
    for page in paginate_events():
        yield page

Full Refresh

Force a complete reload when needed:
pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
    dataset_name="github_data",
)

# Normal incremental load
pipeline.run(repo_issues())

# Force full refresh - deletes and reloads
pipeline.run(repo_issues(), write_disposition="replace")

# Refresh just one table
pipeline.run(
    repo_issues().with_resources("issues"),
    write_disposition="replace"
)

SQL Database Incremental Loading

import dlt
from dlt.sources.sql_database import sql_database

pipeline = dlt.pipeline(
    pipeline_name="database_sync",
    destination="duckdb",
    dataset_name="db_data",
)

# Load with incremental
source = sql_database(
    "mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
).with_resources("family", "clan")

# Configure incremental on timestamp columns
source.family.apply_hints(
    incremental=dlt.sources.incremental("updated")
)
source.clan.apply_hints(
    incremental=dlt.sources.incremental("updated")
)

# Merge changes into destination
load_info = pipeline.run(source, write_disposition="merge")
print(load_info)

REST API Incremental Loading

from dlt.sources.rest_api import rest_api_source
from dlt.common.pendulum import pendulum

config = {
    "client": {
        "base_url": "https://api.github.com/repos/dlt-hub/dlt/",
    },
    "resources": [
        {
            "name": "issues",
            "endpoint": {
                "path": "issues",
                "params": {
                    "since": "{incremental.start_value}",
                    "state": "all",
                },
                "incremental": {
                    "cursor_path": "updated_at",
                    "initial_value": pendulum.today().subtract(days=30).to_iso8601_string(),
                },
            },
        },
    ],
}

source = rest_api_source(config)
pipeline.run(source)

Best Practices

  • Use updated_at for data that changes (users, orders)
  • Use created_at for append-only data (events, logs)
  • Ensure the cursor field is indexed in the source
  • Verify the cursor field is always populated
  • Always use UTC timestamps
  • Ensure source and destination use the same timezone
  • Use timezone-aware datetime objects
  • Don’t use far past dates unless necessary (e.g., “1970-01-01”)
  • Start from a reasonable date to limit initial load
  • Consider using end_value for bounded initial loads
# Check incremental state
pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
)

# View current state
print(pipeline.state)

# Reset state if needed
pipeline.drop()

Next Steps

Schema Evolution

Handle changing schemas automatically

Data Contracts

Validate and enforce data quality

Build docs developers (and LLMs) love