Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/dlt-hub/dlt/llms.txt

Use this file to discover all available pages before exploring further.

Data contracts give you control over schema evolution by defining rules for how dlt handles schema changes. You can enforce strict validation, allow flexibility, or find a balance between the two.

Quick Start

Control schema evolution at the resource level:
import dlt

@dlt.resource(
    schema_contract={
        "tables": "evolve",    # Allow new tables
        "columns": "freeze",   # Reject new columns
        "data_type": "freeze"  # Reject type changes
    }
)
def items():
    yield [{"id": 1, "name": "Alice"}]

pipeline = dlt.pipeline(
    pipeline_name="strict",
    destination="duckdb",
    dataset_name="data"
)

# This will fail if new columns appear
pipeline.run(items())

Schema Entities

Control three types of schema changes:
Controls when new tables are created (including nested tables and dynamic table names):
@dlt.resource(schema_contract={"tables": "freeze"})
def users():
    yield [
        {
            "id": 1,
            "name": "Alice",
            # This nested list would create a new table -> ERROR
            "orders": [{"order_id": 1}]  
        }
    ]

Contract Modes

evolve

Default mode - No constraints on schema changes
  • New tables are created
  • New columns are added
  • Type changes create variant columns
schema_contract="evolve"  # or
schema_contract={
    "tables": "evolve",
    "columns": "evolve",
    "data_type": "evolve"
}

freeze

Strict mode - Reject any schema changes
  • Raises exception on schema changes
  • No data is loaded
  • Best for production stability
schema_contract="freeze"  # or
schema_contract={
    "tables": "freeze",
    "columns": "freeze",
    "data_type": "freeze"
}

discard_row

Filter mode - Skip rows that don’t match schema
  • Discards non-conforming rows
  • Continues loading valid data
  • Useful for data quality
schema_contract="discard_row"

discard_value

Clean mode - Remove non-conforming values
  • Strips invalid fields from rows
  • Loads rows with valid fields
  • Maintains data flow
schema_contract="discard_value"

Setting Contracts

Resource Level

@dlt.resource(
    schema_contract={
        "tables": "evolve",
        "columns": "freeze",
        "data_type": "freeze"
    }
)
def strict_users():
    """New tables OK, but columns and types are frozen"""
    yield [{"id": 1, "name": "Alice"}]

Source Level

Apply to all resources in a source:
@dlt.source(schema_contract="freeze")
def api_source():
    """All resources inherit freeze contract"""
    
    @dlt.resource
    def users():
        yield [{"id": 1, "name": "Alice"}]
    
    @dlt.resource
    def orders():
        yield [{"order_id": 1, "amount": 50.0}]
    
    return users, orders

Pipeline Run Level

Override all existing contracts:
pipeline = dlt.pipeline(
    pipeline_name="api",
    destination="duckdb",
    dataset_name="data"
)

# Override any resource/source contracts
pipeline.run(
    api_source(),
    schema_contract="freeze"  # Strictest setting
)

Dynamic Contract Updates

Change contracts at runtime:
@dlt.resource
def users():
    yield [{"id": 1, "name": "Alice"}]

# Update contract on resource instance
users.apply_hints(
    schema_contract={
        "columns": "freeze",
        "data_type": "discard_row"
    }
)

# Update on source instance
source = api_source()
source.schema_contract = {"tables": "freeze"}

Pydantic Validation

Use Pydantic models for type-safe validation:
import dlt
from pydantic import BaseModel, EmailStr, Field
from typing import List, Optional

class Address(BaseModel):
    street: str
    city: str
    zipcode: str

class User(BaseModel):
    id: int
    name: str = Field(min_length=1, max_length=100)
    email: EmailStr
    age: Optional[int] = Field(None, ge=0, le=150)
    address: Address
    tags: List[str] = []

@dlt.resource(columns=User)
def users():
    """Validates against User model automatically"""
    yield [
        {
            "id": 1,
            "name": "Alice Smith",
            "email": "alice@example.com",
            "age": 30,
            "address": {
                "street": "123 Main St",
                "city": "NYC",
                "zipcode": "10001"
            },
            "tags": ["premium", "verified"]
        }
    ]

pipeline = dlt.pipeline(
    pipeline_name="validated",
    destination="duckdb",
    dataset_name="user_data"
)

# Automatically validates all data against User model
pipeline.run(users())

Pydantic Contract Mapping

Pydantic models automatically set contracts:
# Using columns=Model sets:
schema_contract = {
    "tables": "evolve",        # New tables allowed
    "columns": "discard_value", # Extra fields ignored
    "data_type": "freeze"       # Invalid types raise error
}
Customize Pydantic behavior:
from pydantic import ConfigDict

class StrictUser(BaseModel):
    model_config = ConfigDict(
        extra="forbid"  # Maps to columns="freeze"
    )
    
    id: int
    name: str

class LenientUser(BaseModel):
    model_config = ConfigDict(
        extra="allow"  # Maps to columns="evolve"
    )
    
    id: int
    name: str

Pydantic Extra Modes

Mapping between contract modes and Pydantic:
Contract ModePydantic Extra
evolveallow
freezeforbid
discard_valueignore
discard_rowforbid*
*With ValidationError handling

Real-World Examples

Production API (Strict)

@dlt.resource(
    primary_key="id",
    schema_contract="freeze"
)
def production_users():
    """Strict validation - any schema change fails"""
    yield from fetch_users()

# Fails fast on unexpected changes
pipeline.run(production_users())

Development API (Flexible)

@dlt.resource(
    primary_key="id",
    schema_contract="evolve"
)
def dev_users():
    """Flexible - adapts to any changes"""
    yield from fetch_users()

# Adapts to schema changes automatically
pipeline.run(dev_users())

Data Quality (Filtering)

@dlt.resource(
    primary_key="id",
    schema_contract={
        "tables": "evolve",
        "columns": "discard_value",  # Strip unknown fields
        "data_type": "discard_row"    # Skip invalid rows
    }
)
def quality_checked_users():
    """Loads valid data, discards invalid"""
    yield from fetch_users()

load_info = pipeline.run(quality_checked_users())

# Check what was discarded
print(f"Rows discarded: {load_info.metrics['rows_discarded']}")

Mixed Strategy

@dlt.source
def mixed_source():
    # Critical table - frozen
    @dlt.resource(schema_contract="freeze")
    def transactions():
        yield from fetch_transactions()
    
    # Reference data - flexible
    @dlt.resource(schema_contract="evolve")
    def categories():
        yield from fetch_categories()
    
    # Event data - filter bad rows
    @dlt.resource(schema_contract="discard_row")
    def events():
        yield from fetch_events()
    
    return transactions, categories, events

pipeline.run(mixed_source())

Contract Inheritance

Contracts flow from general to specific:
# 1. Pipeline run (highest priority)
pipeline.run(source(), schema_contract="freeze")

# 2. Source level
@dlt.source(schema_contract="discard_row")
def source():
    # 3. Resource level (lowest priority)
    @dlt.resource(schema_contract="evolve")
    def data():
        yield [...]
    
    return data

# Final contract = "freeze" (from pipeline.run)

Handling Contract Violations

import dlt
from dlt.common.exceptions import PipelineStepFailed

pipeline = dlt.pipeline(
    pipeline_name="validated",
    destination="duckdb",
    dataset_name="data"
)

try:
    pipeline.run(
        strict_resource(),
        schema_contract="freeze"
    )
except PipelineStepFailed as e:
    print(f"Schema contract violation: {e}")
    # Handle error (alert, log, retry with different contract, etc.)

Best Practices

# Development: flexible
schema_contract="evolve"

# Staging: moderate
schema_contract={
    "tables": "freeze",
    "columns": "evolve",
    "data_type": "discard_row"
}

# Production: strict
schema_contract="freeze"
from pydantic import BaseModel, validator

class User(BaseModel):
    id: int
    email: str
    
    @validator('email')
    def validate_email(cls, v):
        if '@' not in v:
            raise ValueError('Invalid email')
        return v

@dlt.resource(columns=User)
def users():
    yield from fetch_users()
load_info = pipeline.run(resource())

# Check for schema changes
for package in load_info.load_packages:
    if package.schema_update:
        send_alert(f"Schema changed: {package.schema_update}")
@dlt.resource(
    schema_contract="freeze"
)
def critical_data():
    """PRODUCTION RESOURCE - DO NOT MODIFY SCHEMA
    
    Contract: freeze
    - No new tables
    - No new columns  
    - No type changes
    
    Contact: data-team@example.com for changes
    """
    yield from fetch_data()

Next Steps

Schema Evolution

Learn about automatic schema evolution

Resources

Deep dive into resources and hints

Build docs developers (and LLMs) love