Documentation Index
Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt
Use this file to discover all available pages before exploring further.
The dlt.sources.helpers module provides utilities for common data loading tasks, including HTTP clients, REST API pagination, and data transformations.
REST Client
A powerful declarative REST API client with automatic pagination, authentication, and request handling.
Import
from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client import AuthConfigBase
from dlt.sources.helpers.rest_client import BasePaginator
RESTClient
Declarative REST API client with built-in pagination and authentication support.
from dlt.sources.helpers.rest_client import RESTClient
client = RESTClient(
base_url="https://api.github.com",
headers={"Accept": "application/vnd.github+json"}
)
# Iterate over paginated results
for page in client.paginate("/repos/dlt-hub/dlt/issues"):
yield page
Key Features:
- Automatic pagination with multiple strategies (offset, cursor, header-based)
- Built-in authentication (Bearer, API key, OAuth)
- Configurable retry logic
- Response hooks for custom processing
- JSON path selectors for extracting data from responses
paginate() Function
Simple function for quick pagination without creating a client instance.
from dlt.sources.helpers.rest_client import paginate
@dlt.resource
def github_issues():
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={"state": "open", "per_page": 100}
):
yield page
HTTP method: "GET", "POST", "PUT", "PATCH", "DELETE"
HTTP headers to send with requests.
JSON body for POST/PUT/PATCH requests.
Authentication configuration.
Paginator instance to use for pagination logic.
JSON path to extract data from each page. Example: "data", "results", "items[*]"
Returns: Iterator of pages.
HTTP Requests
A pre-configured requests client with automatic retries and timeout handling.
Import
from dlt.sources.helpers import requests
Functions
All standard requests library functions with automatic retry and configuration:
from dlt.sources.helpers import requests
# GET request with automatic retries
response = requests.get(
"https://api.github.com/users/octocat",
headers={"Authorization": "Bearer token"}
)
data = response.json()
# POST request
response = requests.post(
"https://api.example.com/data",
json={"key": "value"}
)
Available functions:
requests.get(url, **kwargs)
requests.post(url, **kwargs)
requests.put(url, **kwargs)
requests.patch(url, **kwargs)
requests.delete(url, **kwargs)
requests.head(url, **kwargs)
requests.options(url, **kwargs)
requests.request(method, url, **kwargs)
Features:
- Automatic retry on failure (configurable via
RuntimeConfiguration)
- Configurable timeouts
- Same API as standard
requests library
- Thread-safe
Client
For custom configuration, create a Client instance:
from dlt.sources.helpers.requests import Client
client = Client()
client.update_from_config(config) # Configure from RuntimeConfiguration
response = client.get("https://api.example.com/data")
Session
For persistent sessions with connection pooling:
from dlt.sources.helpers.requests import Session
with Session() as session:
session.headers.update({"Authorization": "Bearer token"})
response = session.get("https://api.example.com/data")
Helper functions for transforming data items in resources.
Import
from dlt.sources.helpers.transform import take_first, skip_first, pivot
take_first()
Filter that takes only the first N items from a resource.
from dlt.sources.helpers.transform import take_first
@dlt.resource
def limited_data():
yield from range(1000)
# Take only first 100 items
pipeline.run(
limited_data().add_map(take_first(100))
)
Maximum number of items to take.
skip_first()
Filter that skips the first N items from a resource.
from dlt.sources.helpers.transform import skip_first
@dlt.resource
def data_without_header():
yield from all_rows # Including header row
# Skip header row
pipeline.run(
data_without_header().add_map(skip_first(1))
)
pivot()
Transform sequences of sequences into sequences of dictionaries.
from dlt.sources.helpers.transform import pivot
# Input: {"data": [[1, 2, 3], [4, 5, 6]]}
# Output: {"data": [{"col_0": 1, "col_1": 2, "col_2": 3}, {"col_0": 4, "col_1": 5, "col_2": 6}]}
@dlt.resource
def matrix_data():
yield {"data": [[1, 2, 3], [4, 5, 6]]}
pipeline.run(
matrix_data().add_map(pivot(paths="data", prefix="col_"))
)
paths
str | List[str]
default:"$"
JSON paths to fields to pivot. Use "$" for root-level arrays.
Prefix for generated column names.
add_row_hash_to_table()
Compute and add content hash for each row in Pandas DataFrame or Arrow table.
from dlt.sources.helpers.transform import add_row_hash_to_table
import pandas as pd
@dlt.resource
def users_df():
df = pd.DataFrame({
"id": [1, 2, 3],
"name": ["Alice", "Bob", "Charlie"]
})
yield df
# Add row_hash column
pipeline.run(
users_df().add_map(add_row_hash_to_table("row_hash"))
)
Name of the column to add with row hashes.
Use Cases:
- SCD2 (Slowly Changing Dimension Type 2) tracking
- Change detection
- Deduplication based on content
Complete Example
Combining multiple helpers:
import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.transform import take_first
@dlt.resource(
primary_key="id",
write_disposition="merge"
)
def github_issues(
updated_at=dlt.sources.incremental("updated_at", initial_value="2024-01-01T00:00:00Z")
):
# Paginate over GitHub API
for page in paginate(
"https://api.github.com/repos/dlt-hub/dlt/issues",
params={
"state": "all",
"since": updated_at.last_value,
"per_page": 100
},
headers={"Accept": "application/vnd.github+json"},
data_selector="$" # Results are at root level
):
yield page
# Run pipeline with limited items for testing
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data"
)
pipeline.run(
github_issues().add_map(take_first(50)) # Limit to 50 items
)
See Also