Load data directly from Python objects including lists, dictionaries, Pandas DataFrames, API responses, and custom generators. This is the most flexible way to use dlt.Documentation Index
Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt
Use this file to discover all available pages before exploring further.
Quick Start
Load a simple list of dictionaries:import dlt
data = [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
]
pipeline = dlt.pipeline(
pipeline_name="simple_load",
destination="duckdb",
dataset_name="users",
)
load_info = pipeline.run(data, table_name="users")
print(load_info)
Load from API
Fetch and load data from any HTTP API:import dlt
from dlt.sources.helpers import requests
# Fetch data from API
response = requests.get("https://api.chess.com/pub/player/magnuscarlsen")
response.raise_for_status()
pipeline = dlt.pipeline(
pipeline_name="chess_api",
destination="duckdb",
dataset_name="chess_data",
)
load_info = pipeline.run(
response.json(),
table_name="players"
)
print(load_info)
Load Multiple Records
Fetch and load multiple records from an API:import dlt
from dlt.sources.helpers import requests
pipeline = dlt.pipeline(
pipeline_name="chess_pipeline",
destination="duckdb",
dataset_name="player_data",
)
# Fetch data for multiple players
data = []
for player in ["magnuscarlsen", "rpragchess", "hikaru"]:
response = requests.get(f"https://api.chess.com/pub/player/{player}")
response.raise_for_status()
data.append(response.json())
load_info = pipeline.run(data, table_name="players")
print(load_info)
Load from Pandas DataFrame
import dlt
import pandas as pd
# Read CSV into DataFrame
csv_url = (
"https://raw.githubusercontent.com/owid/owid-datasets/master/datasets/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020)/"
"Natural%20disasters%20from%201900%20to%202019%20-%20EMDAT%20(2020).csv"
)
df = pd.read_csv(csv_url)
pipeline = dlt.pipeline(
pipeline_name="from_csv",
destination="duckdb",
dataset_name="disasters",
)
load_info = pipeline.run(df, table_name="natural_disasters")
print(load_info)
Load from SQL Database
Load data directly using SQLAlchemy:import dlt
import sqlalchemy as sa
engine = sa.create_engine(
"mysql+pymysql://rfamro@mysql-rfam-public.ebi.ac.uk:4497/Rfam"
)
with engine.connect() as conn:
# Stream data in batches
query = "SELECT * FROM genome LIMIT 1000"
rows = conn.execution_options(yield_per=100).exec_driver_sql(query)
pipeline = dlt.pipeline(
pipeline_name="from_database",
destination="duckdb",
dataset_name="genome_data",
)
# Convert rows to dictionaries
data = map(lambda row: dict(row._mapping), rows)
load_info = pipeline.run(data, table_name="genome")
print(load_info)
Using Resources
Create reusable data resources:import dlt
from dlt.sources.helpers import requests
@dlt.resource(write_disposition="replace")
def github_issues(repository="dlt-hub/dlt"):
"""Fetch GitHub issues"""
url = f"https://api.github.com/repos/{repository}/issues"
response = requests.get(url, params={"state": "open", "per_page": 100})
response.raise_for_status()
yield response.json()
# Use the resource
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data",
)
load_info = pipeline.run(github_issues())
print(load_info)
Generator Functions
Stream data efficiently using generators:import dlt
from dlt.sources.helpers import requests
@dlt.resource
def paginated_api(base_url: str, endpoint: str):
"""Fetch paginated data from an API"""
page = 1
while True:
response = requests.get(
f"{base_url}/{endpoint}",
params={"page": page, "per_page": 100}
)
response.raise_for_status()
data = response.json()
if not data:
break
yield data
page += 1
pipeline = dlt.pipeline(
pipeline_name="paginated",
destination="duckdb",
dataset_name="api_data",
)
load_info = pipeline.run(
paginated_api(
base_url="https://api.example.com",
endpoint="users"
)
)
print(load_info)
Using Sources
Group multiple resources into a source:import dlt
from dlt.sources.helpers import requests
from typing import Iterator
@dlt.source
def github_source(repository: str = "dlt-hub/dlt"):
"""GitHub source with multiple resources"""
@dlt.resource(primary_key="id")
def issues():
url = f"https://api.github.com/repos/{repository}/issues"
response = requests.get(url)
response.raise_for_status()
yield response.json()
@dlt.resource(primary_key="id")
def pull_requests():
url = f"https://api.github.com/repos/{repository}/pulls"
response = requests.get(url)
response.raise_for_status()
yield response.json()
return issues, pull_requests
# Load all resources from source
pipeline = dlt.pipeline(
pipeline_name="github",
destination="duckdb",
dataset_name="github_data",
)
load_info = pipeline.run(github_source())
print(load_info)
Nested Data
dlt automatically handles nested structures:import dlt
data = [
{
"id": 1,
"organization": "Tech Innovations Inc.",
"address": {
"building": "r&d",
"room": 7890,
},
"inventory": [
{"name": "Plasma ray", "inventory_nr": 2411},
{"name": "Self-aware Roomba", "inventory_nr": 268},
],
}
]
pipeline = dlt.pipeline(
pipeline_name="nested_data",
destination="duckdb",
dataset_name="organizations",
)
load_info = pipeline.run(data, table_name="org")
print(load_info)
# Creates tables: org, org__inventory
Using REST Client Helpers
Simplify API pagination with built-in helpers:import dlt
from dlt.sources.helpers.rest_client import paginate
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth
from dlt.sources.helpers.rest_client.paginators import HeaderLinkPaginator
@dlt.resource(write_disposition="replace")
def github_issues(api_token=dlt.secrets.value):
"""Paginate through GitHub issues automatically"""
url = "https://api.github.com/repos/dlt-hub/dlt/issues"
auth = BearerTokenAuth(api_token) if api_token else None
for page in paginate(
url,
auth=auth,
paginator=HeaderLinkPaginator(),
params={"state": "open", "per_page": 100}
):
yield page
pipeline = dlt.pipeline(
pipeline_name="github_paginated",
destination="duckdb",
dataset_name="github_data",
)
load_info = pipeline.run(github_issues())
print(load_info)
Arrow/Parquet Data
Load PyArrow tables directly:import dlt
import pyarrow as pa
import pyarrow.parquet as pq
# Read Parquet file as Arrow table
table = pq.read_table("data/users.parquet")
pipeline = dlt.pipeline(
pipeline_name="arrow_load",
destination="duckdb",
dataset_name="user_data",
)
load_info = pipeline.run(
table,
table_name="users",
loader_file_format="parquet"
)
print(load_info)
Complete Example: Multi-Source Pipeline
import dlt
import pandas as pd
from dlt.sources.helpers import requests
def load_all_data():
pipeline = dlt.pipeline(
pipeline_name="multi_source",
destination="duckdb",
dataset_name="analytics",
)
# Load from API
api_response = requests.get(
"https://api.chess.com/pub/player/magnuscarlsen"
)
api_data = api_response.json()
# Load from Pandas
df = pd.read_csv("data/users.csv")
# Load from Python list
events = [
{"event": "login", "user_id": 1, "timestamp": "2024-01-01"},
{"event": "logout", "user_id": 1, "timestamp": "2024-01-01"},
]
# Load all at once
load_info = pipeline.run(
[
dlt.resource(api_data, name="chess_players"),
dlt.resource(df, name="users"),
dlt.resource(events, name="events"),
]
)
print(load_info)
if __name__ == "__main__":
load_all_data()
Next Steps
Resources & Sources
Learn about resources and sources
Incremental Loading
Add incremental loading to your pipelines