Documentation Index Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt
Use this file to discover all available pages before exploring further.
Data contracts give you control over schema evolution by defining rules for how dlt handles schema changes. You can enforce strict validation, allow flexibility, or find a balance between the two.
Quick Start
Control schema evolution at the resource level:
import dlt
@dlt.resource (
schema_contract = {
"tables" : "evolve" , # Allow new tables
"columns" : "freeze" , # Reject new columns
"data_type" : "freeze" # Reject type changes
}
)
def items ():
yield [{ "id" : 1 , "name" : "Alice" }]
pipeline = dlt.pipeline(
pipeline_name = "strict" ,
destination = "duckdb" ,
dataset_name = "data"
)
# This will fail if new columns appear
pipeline.run(items())
Schema Entities
Control three types of schema changes:
Controls when new tables are created (including nested tables and dynamic table names): @dlt.resource ( schema_contract = { "tables" : "freeze" })
def users ():
yield [
{
"id" : 1 ,
"name" : "Alice" ,
# This nested list would create a new table -> ERROR
"orders" : [{ "order_id" : 1 }]
}
]
Controls when new columns are added to existing tables: @dlt.resource ( schema_contract = { "columns" : "freeze" })
def users ():
yield [
{
"id" : 1 ,
"name" : "Alice" ,
"email" : "alice@example.com" , # New column -> ERROR
}
]
Controls when column data types change: @dlt.resource ( schema_contract = { "data_type" : "freeze" })
def results ():
yield [
{ "score" : 42 }, # First: integer
{ "score" : "high" }, # Type change -> ERROR
]
Contract Modes
evolve Default mode - No constraints on schema changes
New tables are created
New columns are added
Type changes create variant columns
schema_contract = "evolve" # or
schema_contract = {
"tables" : "evolve" ,
"columns" : "evolve" ,
"data_type" : "evolve"
}
freeze Strict mode - Reject any schema changes
Raises exception on schema changes
No data is loaded
Best for production stability
schema_contract = "freeze" # or
schema_contract = {
"tables" : "freeze" ,
"columns" : "freeze" ,
"data_type" : "freeze"
}
discard_row Filter mode - Skip rows that don’t match schema
Discards non-conforming rows
Continues loading valid data
Useful for data quality
schema_contract = "discard_row"
discard_value Clean mode - Remove non-conforming values
Strips invalid fields from rows
Loads rows with valid fields
Maintains data flow
schema_contract = "discard_value"
Setting Contracts
Resource Level
@dlt.resource (
schema_contract = {
"tables" : "evolve" ,
"columns" : "freeze" ,
"data_type" : "freeze"
}
)
def strict_users ():
"""New tables OK, but columns and types are frozen"""
yield [{ "id" : 1 , "name" : "Alice" }]
Source Level
Apply to all resources in a source:
@dlt.source ( schema_contract = "freeze" )
def api_source ():
"""All resources inherit freeze contract"""
@dlt.resource
def users ():
yield [{ "id" : 1 , "name" : "Alice" }]
@dlt.resource
def orders ():
yield [{ "order_id" : 1 , "amount" : 50.0 }]
return users, orders
Pipeline Run Level
Override all existing contracts:
pipeline = dlt.pipeline(
pipeline_name = "api" ,
destination = "duckdb" ,
dataset_name = "data"
)
# Override any resource/source contracts
pipeline.run(
api_source(),
schema_contract = "freeze" # Strictest setting
)
Dynamic Contract Updates
Change contracts at runtime:
@dlt.resource
def users ():
yield [{ "id" : 1 , "name" : "Alice" }]
# Update contract on resource instance
users.apply_hints(
schema_contract = {
"columns" : "freeze" ,
"data_type" : "discard_row"
}
)
# Update on source instance
source = api_source()
source.schema_contract = { "tables" : "freeze" }
Pydantic Validation
Use Pydantic models for type-safe validation:
import dlt
from pydantic import BaseModel, EmailStr, Field
from typing import List, Optional
class Address ( BaseModel ):
street: str
city: str
zipcode: str
class User ( BaseModel ):
id : int
name: str = Field( min_length = 1 , max_length = 100 )
email: EmailStr
age: Optional[ int ] = Field( None , ge = 0 , le = 150 )
address: Address
tags: List[ str ] = []
@dlt.resource ( columns = User)
def users ():
"""Validates against User model automatically"""
yield [
{
"id" : 1 ,
"name" : "Alice Smith" ,
"email" : "alice@example.com" ,
"age" : 30 ,
"address" : {
"street" : "123 Main St" ,
"city" : "NYC" ,
"zipcode" : "10001"
},
"tags" : [ "premium" , "verified" ]
}
]
pipeline = dlt.pipeline(
pipeline_name = "validated" ,
destination = "duckdb" ,
dataset_name = "user_data"
)
# Automatically validates all data against User model
pipeline.run(users())
Pydantic Contract Mapping
Pydantic models automatically set contracts:
# Using columns=Model sets:
schema_contract = {
"tables" : "evolve" , # New tables allowed
"columns" : "discard_value" , # Extra fields ignored
"data_type" : "freeze" # Invalid types raise error
}
Customize Pydantic behavior:
from pydantic import ConfigDict
class StrictUser ( BaseModel ):
model_config = ConfigDict(
extra = "forbid" # Maps to columns="freeze"
)
id : int
name: str
class LenientUser ( BaseModel ):
model_config = ConfigDict(
extra = "allow" # Maps to columns="evolve"
)
id : int
name: str
Mapping between contract modes and Pydantic:
Contract Mode Pydantic Extra evolve allow freeze forbid discard_value ignore discard_row forbid*
*With ValidationError handling
Real-World Examples
Production API (Strict)
@dlt.resource (
primary_key = "id" ,
schema_contract = "freeze"
)
def production_users ():
"""Strict validation - any schema change fails"""
yield from fetch_users()
# Fails fast on unexpected changes
pipeline.run(production_users())
Development API (Flexible)
@dlt.resource (
primary_key = "id" ,
schema_contract = "evolve"
)
def dev_users ():
"""Flexible - adapts to any changes"""
yield from fetch_users()
# Adapts to schema changes automatically
pipeline.run(dev_users())
Data Quality (Filtering)
@dlt.resource (
primary_key = "id" ,
schema_contract = {
"tables" : "evolve" ,
"columns" : "discard_value" , # Strip unknown fields
"data_type" : "discard_row" # Skip invalid rows
}
)
def quality_checked_users ():
"""Loads valid data, discards invalid"""
yield from fetch_users()
load_info = pipeline.run(quality_checked_users())
# Check what was discarded
print ( f "Rows discarded: { load_info.metrics[ 'rows_discarded' ] } " )
Mixed Strategy
@dlt.source
def mixed_source ():
# Critical table - frozen
@dlt.resource ( schema_contract = "freeze" )
def transactions ():
yield from fetch_transactions()
# Reference data - flexible
@dlt.resource ( schema_contract = "evolve" )
def categories ():
yield from fetch_categories()
# Event data - filter bad rows
@dlt.resource ( schema_contract = "discard_row" )
def events ():
yield from fetch_events()
return transactions, categories, events
pipeline.run(mixed_source())
Contract Inheritance
Contracts flow from general to specific:
# 1. Pipeline run (highest priority)
pipeline.run(source(), schema_contract = "freeze" )
# 2. Source level
@dlt.source ( schema_contract = "discard_row" )
def source ():
# 3. Resource level (lowest priority)
@dlt.resource ( schema_contract = "evolve" )
def data ():
yield [ ... ]
return data
# Final contract = "freeze" (from pipeline.run)
Handling Contract Violations
import dlt
from dlt.common.exceptions import PipelineStepFailed
pipeline = dlt.pipeline(
pipeline_name = "validated" ,
destination = "duckdb" ,
dataset_name = "data"
)
try :
pipeline.run(
strict_resource(),
schema_contract = "freeze"
)
except PipelineStepFailed as e:
print ( f "Schema contract violation: { e } " )
# Handle error (alert, log, retry with different contract, etc.)
Best Practices
Start Flexible, Then Tighten
# Development: flexible
schema_contract = "evolve"
# Staging: moderate
schema_contract = {
"tables" : "freeze" ,
"columns" : "evolve" ,
"data_type" : "discard_row"
}
# Production: strict
schema_contract = "freeze"
Use Pydantic for Complex Validation
from pydantic import BaseModel, validator
class User ( BaseModel ):
id : int
email: str
@validator ( 'email' )
def validate_email ( cls , v ):
if '@' not in v:
raise ValueError ( 'Invalid email' )
return v
@dlt.resource ( columns = User)
def users ():
yield from fetch_users()
Monitor Contract Violations
load_info = pipeline.run(resource())
# Check for schema changes
for package in load_info.load_packages:
if package.schema_update:
send_alert( f "Schema changed: { package.schema_update } " )
@dlt.resource (
schema_contract = "freeze"
)
def critical_data ():
"""PRODUCTION RESOURCE - DO NOT MODIFY SCHEMA
Contract: freeze
- No new tables
- No new columns
- No type changes
Contact: data-team@example.com for changes
"""
yield from fetch_data()
Next Steps
Schema Evolution Learn about automatic schema evolution
Resources Deep dive into resources and hints