Documentation Index Fetch the complete documentation index at: https://mintlify.com/zenml-io/zenml/llms.txt
Use this file to discover all available pages before exploring further.
Dynamic pipelines allow you to create execution graphs that adapt based on runtime conditions, data characteristics, or external inputs. Unlike static pipelines where the DAG is known before execution, dynamic pipelines determine their structure during runtime.
Static vs Dynamic Pipelines
Static Pipelines
In static pipelines, the complete execution graph is known before any step runs:
from zenml import pipeline, step
@step
def load_data () -> dict :
return { "records" : 100 }
@step
def process_data ( data : dict ) -> dict :
return { "processed" : data[ "records" ] * 2 }
@step
def save_results ( data : dict ) -> None :
print ( f "Saved { data[ 'processed' ] } records" )
@pipeline
def static_pipeline ():
"""All steps are known before execution starts."""
data = load_data()
processed = process_data(data)
save_results(processed)
The orchestrator knows the full DAG:
load_data → process_data → save_results
Dynamic Pipelines
Dynamic pipelines determine which steps to run based on runtime conditions:
from zenml import pipeline, step
from zenml.config import ResourceSettings
@step
def check_data_size () -> int :
"""Determine data size at runtime."""
# Could query database, check file size, etc.
return 1000000 # 1M records
@step ( settings = { "resources" : ResourceSettings( cpu_count = 2 , memory = "4GB" )})
def process_small_batch ( batch_id : int ) -> dict :
return { "batch" : batch_id, "processed" : True }
@step ( settings = { "resources" : ResourceSettings( cpu_count = 8 , memory = "16GB" )})
def process_large_batch ( batch_id : int ) -> dict :
return { "batch" : batch_id, "processed" : True }
@pipeline
def dynamic_pipeline ():
"""Steps are determined during execution."""
size = check_data_size()
# Branch based on runtime value
if size < 10000 :
# Small dataset: single-threaded
results = [process_small_batch(i) for i in range ( 3 )]
else :
# Large dataset: parallel processing
results = [process_large_batch(i) for i in range ( 10 )]
return results
How Dynamic Pipelines Work
Orchestration Container
For dynamic pipelines, orchestrators launch an orchestration container that:
Runs the pipeline function to determine the execution graph
Submits discovered steps to the orchestration backend
Monitors step execution and handles failures
┌─────────────────────────────────────────┐
│ Orchestration Container │
│ │
│ 1. Run pipeline function │
│ 2. Discover steps dynamically │
│ 3. Submit steps to backend │
└─────────────────────────────────────────┘
│
├──────┬──────┬──────┬──────┐
▼ ▼ ▼ ▼ ▼
Step 1 Step 2 Step 3 Step 4 Step 5
Database Schema
Dynamic pipelines use a different database schema than static pipelines. The is_dynamic flag on pipeline snapshots indicates this:
# From migration: af27025fe19c_dynamic_pipelines.py
# Pipeline snapshots have is_dynamic flag
pipeline_snapshot.is_dynamic = True # or False
# Step configurations can be linked to:
# - snapshot_id (static: all steps known upfront)
# - step_run_id (dynamic: steps discovered during execution)
This allows steps to be added to the pipeline as they’re discovered during execution.
Orchestrator Support
Not all orchestrators support dynamic pipelines. Check the supports_dynamic_pipelines property:
from zenml.orchestrators import BaseOrchestrator
class MyOrchestrator ( BaseOrchestrator ):
@ property
def supports_dynamic_pipelines ( self ) -> bool :
"""Whether this orchestrator supports dynamic pipelines.
Returns True if submit_dynamic_pipeline() is implemented.
"""
return (
getattr ( self .submit_dynamic_pipeline, "__func__" , None )
is not BaseOrchestrator.submit_dynamic_pipeline
)
Implementing Dynamic Pipeline Support
To support dynamic pipelines in your custom orchestrator:
from typing import Dict, Optional
from zenml.orchestrators import ContainerizedOrchestrator, SubmissionResult
from zenml.models import PipelineRunResponse, PipelineSnapshotResponse
from zenml.stack import Stack
class MyOrchestrator ( ContainerizedOrchestrator ):
def submit_dynamic_pipeline (
self ,
snapshot : PipelineSnapshotResponse,
stack : Stack,
environment : Dict[ str , str ],
placeholder_run : Optional[PipelineRunResponse] = None ,
) -> Optional[SubmissionResult]:
"""Submit a dynamic pipeline to the orchestration backend.
Args:
snapshot: Pipeline snapshot
stack: The stack the pipeline runs on
environment: Environment variables for orchestration container
placeholder_run: Optional placeholder run
Returns:
Submission result with optional wait callback
"""
# Get the Docker image for the orchestration container
image = self .get_image(snapshot)
# The orchestration container runs the pipeline function
# which discovers and submits steps dynamically
entrypoint = self ._get_entrypoint_command()
# Launch orchestration container
job_id = self ._submit_job(
image = image,
command = entrypoint,
environment = environment,
# Orchestration containers typically need more resources
cpu_count = 2 ,
memory = "4GB" ,
)
return SubmissionResult(
wait_for_completion = lambda : self ._wait_for_job(job_id)
)
def _get_entrypoint_command ( self ) -> list :
"""Get command to run pipeline orchestration."""
return [
"python" ,
"-m" ,
"zenml.orchestrators.pipeline_runner" ,
]
Advanced Dynamic Patterns
Conditional Branching
Execute different step sequences based on data characteristics:
@step
def analyze_data () -> str :
"""Analyze data and determine processing strategy."""
# Could check data quality, size, distribution, etc.
data_quality = check_quality()
if data_quality < 0.5 :
return "needs_cleaning"
elif data_quality < 0.8 :
return "needs_validation"
else :
return "ready"
@step
def clean_data ( data : dict ) -> dict :
return { "cleaned" : True }
@step
def validate_data ( data : dict ) -> dict :
return { "validated" : True }
@step
def process_data ( data : dict ) -> dict :
return { "processed" : True }
@pipeline
def adaptive_pipeline ():
"""Adapt processing based on data quality."""
quality = analyze_data()
if quality == "needs_cleaning" :
data = clean_data()
data = validate_data(data)
elif quality == "needs_validation" :
data = validate_data()
else :
data = { "status" : "ready" }
result = process_data(data)
return result
Dynamic Parallelism
Scale parallelism based on data characteristics:
@step
def count_partitions () -> int :
"""Determine optimal partition count."""
total_records = get_record_count()
optimal_batch_size = 10000
return (total_records + optimal_batch_size - 1 ) // optimal_batch_size
@step
def process_partition ( partition_id : int , total : int ) -> dict :
"""Process a single partition."""
return {
"partition" : partition_id,
"total_partitions" : total,
"records_processed" : 10000 ,
}
@step
def merge_results ( results : list ) -> dict :
"""Merge results from all partitions."""
total_processed = sum (r[ "records_processed" ] for r in results)
return { "total_processed" : total_processed}
@pipeline
def dynamic_parallel_pipeline ():
"""Process data with dynamic parallelism."""
num_partitions = count_partitions()
# Create partition steps dynamically
results = [
process_partition(i, num_partitions)
for i in range (num_partitions)
]
final_result = merge_results(results)
return final_result
Make decisions based on external services:
@step
def check_external_api () -> dict :
"""Query external API for configuration."""
import requests
response = requests.get( "https://api.example.com/config" )
return response.json()
@step
def process_with_gpu ( data : dict ) -> dict :
"""GPU-accelerated processing."""
return { "method" : "gpu" , "result" : data}
@step
def process_with_cpu ( data : dict ) -> dict :
"""CPU-only processing."""
return { "method" : "cpu" , "result" : data}
@pipeline
def external_config_pipeline ():
"""Use external configuration to determine execution."""
config = check_external_api()
if config.get( "gpu_available" ):
result = process_with_gpu(config)
else :
result = process_with_cpu(config)
return result
Resource Requirements
Orchestration Container Resources
The orchestration container typically needs minimal resources since it only coordinates execution:
class MyOrchestrator ( ContainerizedOrchestrator ):
def submit_dynamic_pipeline ( self , snapshot , stack , environment , placeholder_run ):
# Orchestration container: modest resources
orchestration_job = self ._submit_job(
image = self .get_image(snapshot),
cpu_count = 1 , # Just needs to coordinate
memory = "2GB" ,
environment = environment,
)
return SubmissionResult(
wait_for_completion = lambda : self ._wait(orchestration_job)
)
Step Resources
Individual steps can have their own resource requirements:
from zenml import step
from zenml.config import ResourceSettings
@step (
settings = {
"resources" : ResourceSettings(
cpu_count = 8 ,
memory = "32GB" ,
gpu_count = 1 ,
)
}
)
def heavy_processing_step ( data : dict ) -> dict :
"""Step with significant resource requirements."""
# GPU-accelerated processing
return { "processed" : True }
The get_orchestrator_run_id() Challenge
For dynamic pipelines, get_orchestrator_run_id() has simpler requirements than static pipelines:
import socket
import os
def get_orchestrator_run_id ( self ) -> str :
"""Return unique ID for orchestrator run.
For dynamic pipelines:
- Only needs to be unique within the orchestration container
- Doesn't need to be shared across step containers
- The orchestration container creates the pipeline run
For static pipelines:
- Must be the same for ALL steps in the pipeline run
- Must be unique across different pipeline runs
- Used by all steps to find the same pipeline run
"""
# Static pipeline: use environment variable set when launching steps
if "MY_ORCHESTRATOR_RUN_ID" in os.environ:
return os.environ[ "MY_ORCHESTRATOR_RUN_ID" ]
# Dynamic pipeline: use container ID (only for orchestration container)
return socket.gethostname()
The orchestration container creates the pipeline run, so individual steps don’t need to coordinate through the orchestrator run ID.
Best Practices
Minimize Orchestration Logic Keep the orchestration container lightweight. Move heavy computation to steps.
Handle Failures Gracefully Dynamic pipelines can fail during step discovery. Add error handling.
Document Decision Logic Make it clear what runtime conditions trigger different execution paths.
Test Edge Cases Test with boundary conditions that trigger different execution paths.
Testing Dynamic Pipelines
import pytest
from zenml import pipeline, step
@step
def get_size ( test_size : int ) -> int :
"""Allow injecting size for testing."""
return test_size
@step
def process_small ( data : int ) -> str :
return f "small- { data } "
@step
def process_large ( data : int ) -> str :
return f "large- { data } "
@pipeline
def testable_dynamic_pipeline ( test_size : int ):
size = get_size(test_size)
if size < 100 :
return process_small(size)
else :
return process_large(size)
# Test different branches
def test_small_branch ():
result = testable_dynamic_pipeline( test_size = 50 )
assert "small" in result
def test_large_branch ():
result = testable_dynamic_pipeline( test_size = 200 )
assert "large" in result
Common Pitfalls
Non-Deterministic Pipelines Avoid randomness in branching logic. Pipeline reruns should produce the same execution graph for the same inputs.
Expensive Orchestration Don’t do heavy computation in the orchestration container. Move it to dedicated steps.
Assuming All Orchestrators Support Dynamic Pipelines Always check orchestrator.supports_dynamic_pipelines before using dynamic features.
Next Steps
Custom Orchestrators Build your own orchestrator with dynamic pipeline support
Resource Configuration Configure CPU, memory, and GPU for dynamic steps
Containerization Understand Docker image building for dynamic pipelines
Custom Materializers Handle custom data types between dynamic steps