Documentation Index
Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt
Use this file to discover all available pages before exploring further.
Custom Sources
Create custom data sources to load data from any API, database, or data origin. dlt provides decorators and utilities that make it easy to build production-ready sources with minimal code.Basic Custom Source
Create a simple source using Python generators:Create a resource function
Define a function that yields data:
import dlt
@dlt.resource(name="users", write_disposition="replace")
def get_users():
"""Fetch user data"""
users = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
{"id": 3, "name": "Charlie", "email": "charlie@example.com"},
]
yield users
Wrap in a source
Group resources in a source function:
@dlt.source
def my_source():
"""Custom data source"""
return get_users()
Loading from APIs
Simple API Source
Load data from a REST API:import dlt
from dlt.sources.helpers import requests
@dlt.resource(name="posts", write_disposition="replace")
def get_posts():
"""Fetch posts from JSONPlaceholder API"""
response = requests.get("https://jsonplaceholder.typicode.com/posts")
response.raise_for_status()
yield response.json()
@dlt.source
def jsonplaceholder():
"""Load data from JSONPlaceholder API"""
return get_posts()
pipeline = dlt.pipeline(
pipeline_name="jsonplaceholder",
destination="duckdb",
dataset_name="jsonplaceholder_data"
)
load_info = pipeline.run(jsonplaceholder())
print(load_info)
API with Authentication
Add authentication to your API requests:import dlt
from dlt.sources.helpers import requests
@dlt.resource(name="repos", write_disposition="merge", primary_key="id")
def get_github_repos(api_token: str = dlt.secrets.value):
"""Fetch repositories from GitHub API"""
headers = {
"Authorization": f"Bearer {api_token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(
"https://api.github.com/user/repos",
headers=headers
)
response.raise_for_status()
yield response.json()
@dlt.source
def github_source(api_token: str = dlt.secrets.value):
"""Load GitHub data"""
return get_github_repos(api_token)
secrets.toml:
[sources.github_source]
api_token = "ghp_your_token_here"
Pagination
Simple Pagination
Iterate through paginated API responses:@dlt.resource(name="issues", write_disposition="replace")
def get_issues(api_token: str = dlt.secrets.value):
"""Fetch issues with pagination"""
headers = {"Authorization": f"Bearer {api_token}"}
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
params = {"per_page": 100, "page": 1}
while url:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
issues = response.json()
if not issues:
break
yield issues
# Get next page URL from Link header
link_header = response.headers.get("Link")
if link_header and 'rel="next"' in link_header:
params["page"] += 1
else:
break
Using REST Client Helper
Use dlt’s built-in REST client for automatic pagination:from dlt.sources.helpers.rest_client import RESTClient, paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator
@dlt.resource
def paginated_resource(api_token: str = dlt.secrets.value):
"""Use REST client for automatic pagination"""
client = RESTClient(
base_url="https://api.github.com",
auth=BearerTokenAuth(token=api_token),
)
for page in client.paginate(
"/repos/dlt-hub/dlt/issues",
params={"per_page": 100},
paginator=JSONResponsePaginator(next_url_path="next_page"),
):
yield page
Incremental Loading
Load only new or updated records:- Basic Incremental
- Incremental with Merge
- Nested Incremental
@dlt.resource(
name="events",
write_disposition="append",
)
def get_events(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
):
"""Fetch events incrementally"""
response = requests.get(
"https://api.example.com/events",
params={"since": updated_at.last_value}
)
yield response.json()
@dlt.resource(
name="users",
write_disposition="merge",
primary_key="id",
)
def get_users(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
):
"""Fetch and merge updated users"""
response = requests.get(
"https://api.example.com/users",
params={
"updated_since": updated_at.last_value,
"limit": 1000
}
)
yield response.json()["users"]
@dlt.resource(
name="orders",
write_disposition="merge",
primary_key="order_id",
)
def get_orders(
created_at=dlt.sources.incremental(
"order_date", # Field in the data
initial_value="2024-01-01T00:00:00Z"
)
):
"""Incremental loading based on nested field"""
response = requests.get(
"https://api.example.com/orders",
params={"created_after": created_at.last_value}
)
yield response.json()
The
incremental object automatically tracks the maximum cursor value and uses it as the starting point for subsequent runs.Transformers
Process data from parent resources:@dlt.resource(name="users", write_disposition="replace")
def get_users():
"""Fetch users"""
response = requests.get("https://api.example.com/users")
yield response.json()
@dlt.transformer(
name="user_posts",
write_disposition="replace",
data_from=get_users
)
def get_user_posts(user):
"""Fetch posts for each user"""
user_id = user["id"]
response = requests.get(f"https://api.example.com/users/{user_id}/posts")
yield response.json()
@dlt.source
def api_source():
"""Source with dependent resources"""
return [get_users(), get_user_posts()]
Filtering and Transforming Data
Modify data before loading:@dlt.resource(name="active_users")
def get_active_users():
"""Load only active users"""
response = requests.get("https://api.example.com/users")
users = response.json()
# Filter active users
for user in users:
if user.get("status") == "active":
yield user
Schema Definition
Using Column Hints
Define column types and constraints:@dlt.resource(
name="products",
write_disposition="merge",
primary_key="product_id",
columns={
"product_id": {"data_type": "bigint", "nullable": False},
"name": {"data_type": "text", "nullable": False},
"price": {"data_type": "decimal", "precision": 10, "scale": 2},
"tags": {"data_type": "json"}, # Store as JSON, not nested table
"created_at": {"data_type": "timestamp"},
},
)
def get_products():
"""Fetch products with schema hints"""
response = requests.get("https://api.example.com/products")
yield response.json()
Using Pydantic Models
Define schema with Pydantic:from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
class Product(BaseModel):
product_id: int
name: str
price: float
tags: List[str]
created_at: datetime
description: Optional[str] = None
@dlt.resource(
name="products",
write_disposition="merge",
primary_key="product_id",
columns=Product,
)
def get_products():
"""Fetch products with Pydantic schema"""
response = requests.get("https://api.example.com/products")
yield response.json()
Error Handling
Implement robust error handling:from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
@dlt.resource(name="resilient_resource")
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def get_data_with_retry():
"""Fetch data with automatic retry"""
try:
response = requests.get("https://api.example.com/data")
response.raise_for_status()
yield response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
raise
@dlt.resource(name="error_tolerant")
def get_data_with_error_handling():
"""Continue on individual record errors"""
response = requests.get("https://api.example.com/records")
records = response.json()
for record in records:
try:
# Process record
processed = process_record(record)
yield processed
except Exception as e:
# Log error but continue
logger.warning(f"Failed to process record {record.get('id')}: {e}")
continue
Complete Example: Custom API Source
A comprehensive example with authentication, pagination, and incremental loading:import dlt
from dlt.sources.helpers import requests
from typing import Iterator, Dict, Any
@dlt.source
def ecommerce_api(
api_key: str = dlt.secrets.value,
base_url: str = dlt.config.value,
):
"""Load data from e-commerce API"""
@dlt.resource(
name="customers",
write_disposition="merge",
primary_key="customer_id",
)
def get_customers(
updated_at=dlt.sources.incremental(
"updated_at",
initial_value="2024-01-01T00:00:00Z"
)
) -> Iterator[Dict[str, Any]]:
"""Fetch customers incrementally"""
headers = {"Authorization": f"Bearer {api_key}"}
page = 1
while True:
response = requests.get(
f"{base_url}/customers",
headers=headers,
params={
"updated_since": updated_at.last_value,
"page": page,
"per_page": 100,
}
)
response.raise_for_status()
data = response.json()
customers = data.get("customers", [])
if not customers:
break
yield customers
if not data.get("has_more"):
break
page += 1
@dlt.resource(
name="orders",
write_disposition="merge",
primary_key="order_id",
)
def get_orders(
created_at=dlt.sources.incremental(
"created_at",
initial_value="2024-01-01T00:00:00Z"
)
) -> Iterator[Dict[str, Any]]:
"""Fetch orders incrementally"""
headers = {"Authorization": f"Bearer {api_key}"}
page = 1
while True:
response = requests.get(
f"{base_url}/orders",
headers=headers,
params={
"created_after": created_at.last_value,
"page": page,
"per_page": 100,
"expand": "line_items",
}
)
response.raise_for_status()
data = response.json()
orders = data.get("orders", [])
if not orders:
break
yield orders
if not data.get("has_more"):
break
page += 1
@dlt.resource(
name="products",
write_disposition="replace",
)
def get_products() -> Iterator[Dict[str, Any]]:
"""Fetch all products"""
headers = {"Authorization": f"Bearer {api_key}"}
page = 1
while True:
response = requests.get(
f"{base_url}/products",
headers=headers,
params={"page": page, "per_page": 100}
)
response.raise_for_status()
data = response.json()
products = data.get("products", [])
if not products:
break
yield products
if not data.get("has_more"):
break
page += 1
return [get_customers(), get_orders(), get_products()]
# Configure in config.toml
# [sources.ecommerce_api]
# base_url = "https://api.example.com/v1"
# Configure in secrets.toml
# [sources.ecommerce_api]
# api_key = "your_api_key_here"
# Run the pipeline
if __name__ == "__main__":
pipeline = dlt.pipeline(
pipeline_name="ecommerce_pipeline",
destination="duckdb",
dataset_name="ecommerce_data"
)
load_info = pipeline.run(ecommerce_api())
print(load_info)
Best Practices
Use generators for memory efficiency
Use generators for memory efficiency
Always use
yield instead of returning large lists:# ✅ Good: Memory efficient
@dlt.resource
def get_data():
for page in range(100):
data = fetch_page(page)
yield data
# ❌ Bad: Loads everything into memory
@dlt.resource
def get_data():
all_data = []
for page in range(100):
all_data.extend(fetch_page(page))
return all_data
Implement proper error handling
Implement proper error handling
Use retry logic and handle errors gracefully:
from tenacity import retry, stop_after_attempt
@dlt.resource
@retry(stop=stop_after_attempt(3))
def resilient_resource():
response = requests.get("https://api.example.com/data")
response.raise_for_status()
yield response.json()
Use configuration for flexibility
Use configuration for flexibility
Externalize configuration and secrets:
@dlt.source
def my_source(
api_key: str = dlt.secrets.value,
base_url: str = dlt.config.value,
batch_size: int = dlt.config.value,
):
# Source implementation
pass
Add type hints
Add type hints
Use type hints for better IDE support and documentation:
from typing import Iterator, Dict, Any
@dlt.resource
def typed_resource() -> Iterator[Dict[str, Any]]:
yield {"key": "value"}
Next Steps
- Explore REST API Source for declarative API loading
- Learn about SQL Database Source
- Check Available Sources