Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt

Use this file to discover all available pages before exploring further.

The dlt.sources.helpers module provides utilities for common data loading tasks, including HTTP clients, REST API pagination, and data transformations.

REST Client

A powerful declarative REST API client with automatic pagination, authentication, and request handling.

Import

from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client import AuthConfigBase
from dlt.sources.helpers.rest_client import BasePaginator

RESTClient

Declarative REST API client with built-in pagination and authentication support.
from dlt.sources.helpers.rest_client import RESTClient

client = RESTClient(
    base_url="https://api.github.com",
    headers={"Accept": "application/vnd.github+json"}
)

# Iterate over paginated results
for page in client.paginate("/repos/dlt-hub/dlt/issues"):
    yield page
Key Features:
  • Automatic pagination with multiple strategies (offset, cursor, header-based)
  • Built-in authentication (Bearer, API key, OAuth)
  • Configurable retry logic
  • Response hooks for custom processing
  • JSON path selectors for extracting data from responses

paginate() Function

Simple function for quick pagination without creating a client instance.
from dlt.sources.helpers.rest_client import paginate

@dlt.resource
def github_issues():
    for page in paginate(
        "https://api.github.com/repos/dlt-hub/dlt/issues",
        params={"state": "open", "per_page": 100}
    ):
        yield page
url
str
required
URL to paginate over.
method
str
default:"GET"
HTTP method: "GET", "POST", "PUT", "PATCH", "DELETE"
headers
Dict[str, str]
HTTP headers to send with requests.
params
Dict[str, Any]
Query parameters.
json
Dict[str, Any]
JSON body for POST/PUT/PATCH requests.
auth
AuthConfigBase
Authentication configuration.
paginator
BasePaginator
Paginator instance to use for pagination logic.
data_selector
str
JSON path to extract data from each page. Example: "data", "results", "items[*]"
Returns: Iterator of pages.

HTTP Requests

A pre-configured requests client with automatic retries and timeout handling.

Import

from dlt.sources.helpers import requests

Functions

All standard requests library functions with automatic retry and configuration:
from dlt.sources.helpers import requests

# GET request with automatic retries
response = requests.get(
    "https://api.github.com/users/octocat",
    headers={"Authorization": "Bearer token"}
)
data = response.json()

# POST request
response = requests.post(
    "https://api.example.com/data",
    json={"key": "value"}
)
Available functions:
  • requests.get(url, **kwargs)
  • requests.post(url, **kwargs)
  • requests.put(url, **kwargs)
  • requests.patch(url, **kwargs)
  • requests.delete(url, **kwargs)
  • requests.head(url, **kwargs)
  • requests.options(url, **kwargs)
  • requests.request(method, url, **kwargs)
Features:
  • Automatic retry on failure (configurable via RuntimeConfiguration)
  • Configurable timeouts
  • Same API as standard requests library
  • Thread-safe

Client

For custom configuration, create a Client instance:
from dlt.sources.helpers.requests import Client

client = Client()
client.update_from_config(config)  # Configure from RuntimeConfiguration

response = client.get("https://api.example.com/data")

Session

For persistent sessions with connection pooling:
from dlt.sources.helpers.requests import Session

with Session() as session:
    session.headers.update({"Authorization": "Bearer token"})
    response = session.get("https://api.example.com/data")

Data Transformations

Helper functions for transforming data items in resources.

Import

from dlt.sources.helpers.transform import take_first, skip_first, pivot

take_first()

Filter that takes only the first N items from a resource.
from dlt.sources.helpers.transform import take_first

@dlt.resource
def limited_data():
    yield from range(1000)

# Take only first 100 items
pipeline.run(
    limited_data().add_map(take_first(100))
)
max_items
int
required
Maximum number of items to take.

skip_first()

Filter that skips the first N items from a resource.
from dlt.sources.helpers.transform import skip_first

@dlt.resource
def data_without_header():
    yield from all_rows  # Including header row

# Skip header row
pipeline.run(
    data_without_header().add_map(skip_first(1))
)
max_items
int
required
Number of items to skip.

pivot()

Transform sequences of sequences into sequences of dictionaries.
from dlt.sources.helpers.transform import pivot

# Input: {"data": [[1, 2, 3], [4, 5, 6]]}
# Output: {"data": [{"col_0": 1, "col_1": 2, "col_2": 3}, {"col_0": 4, "col_1": 5, "col_2": 6}]}

@dlt.resource
def matrix_data():
    yield {"data": [[1, 2, 3], [4, 5, 6]]}

pipeline.run(
    matrix_data().add_map(pivot(paths="data", prefix="col_"))
)
paths
str | List[str]
default:"$"
JSON paths to fields to pivot. Use "$" for root-level arrays.
prefix
str
default:"col"
Prefix for generated column names.

add_row_hash_to_table()

Compute and add content hash for each row in Pandas DataFrame or Arrow table.
from dlt.sources.helpers.transform import add_row_hash_to_table
import pandas as pd

@dlt.resource
def users_df():
    df = pd.DataFrame({
        "id": [1, 2, 3],
        "name": ["Alice", "Bob", "Charlie"]
    })
    yield df

# Add row_hash column
pipeline.run(
    users_df().add_map(add_row_hash_to_table("row_hash"))
)
row_hash_column_name
str
required
Name of the column to add with row hashes.
Use Cases:
  • SCD2 (Slowly Changing Dimension Type 2) tracking
  • Change detection
  • Deduplication based on content

Complete Example

Combining multiple helpers:
import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.transform import take_first

@dlt.resource(
    primary_key="id",
    write_disposition="merge"
)
def github_issues(
    updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01T00:00:00Z")
):
    # Paginate over GitHub API
    for page in paginate(
        "https://api.github.com/repos/dlt-hub/dlt/issues",
        params={
            "state": "all",
            "since": updated_at.last_value,
            "per_page": 100
        },
        headers={"Accept": "application/vnd.github+json"},
        data_selector="$"  # Results are at root level
    ):
        yield page

# Run pipeline with limited items for testing
pipeline = dlt.pipeline(
    pipeline_name="github",
    destination="duckdb",
    dataset_name="github_data"
)

pipeline.run(
    github_issues().add_map(take_first(50))  # Limit to 50 items
)

See Also

Build docs developers (and LLMs) love