Documentation Index Fetch the complete documentation index at: https://mintlify.com/zenml-io/zenml/llms.txt
Use this file to discover all available pages before exploring further.
Step Operators
Step operators allow you to run individual pipeline steps on custom infrastructure. While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where a specific step runs.
Overview
Step operators are useful when:
A specific step needs GPU resources (e.g., model training)
A step requires more compute resources than others
You want to run a step on different infrastructure (e.g., serverless)
A step has special requirements (e.g., specific hardware, libraries)
How Step Operators Work
When a pipeline runs:
The orchestrator executes most steps normally
When a step with a step operator is reached:
The orchestrator hands off execution to the step operator
The step operator runs the step on its configured infrastructure
Results are returned to the orchestrator
The orchestrator continues with the next step
This allows fine-grained control over where each step executes.
Available Step Operators
Kubernetes Step Operator
Runs individual steps as Kubernetes jobs.
Installation:
zenml integration install kubernetes
Configuration:
zenml step-operator register k8s_step_operator --flavor=kubernetes \
--kubernetes_context=my-context \
--kubernetes_namespace=zenml
Requirements:
Kubernetes cluster access
Container registry in your stack
Configured kubectl context
Use cases:
Running GPU-intensive training steps
Steps requiring specific node pools
Isolated execution environments
Resource-intensive data processing
Example:
from zenml import step, pipeline
from zenml.config import ResourceSettings
@step (
step_operator = "k8s_step_operator" ,
settings = {
"resources" : ResourceSettings(
cpu_count = 8 ,
memory = "32GB" ,
gpu_count = 1 ,
)
},
)
def gpu_training_step ( data ) -> Model:
# This step runs on Kubernetes with GPU
model = train_on_gpu(data)
return model
@step
def preprocessing_step () -> Data:
# This step runs on the orchestrator
data = load_and_preprocess()
return data
@pipeline
def training_pipeline ():
data = preprocessing_step()
model = gpu_training_step(data)
SageMaker Step Operator
Runs steps on AWS SageMaker Training Jobs.
Installation:
zenml integration install aws
Configuration:
zenml step-operator register sagemaker_step_operator --flavor=sagemaker \
--role=arn:aws:iam::123456789012:role/ZenMLSageMaker \
--instance_type=ml.p3.2xlarge
Requirements:
AWS account with SageMaker access
IAM role with SageMaker permissions
Container registry (ECR)
S3 artifact store
Features:
Managed infrastructure
Wide range of instance types
Spot instance support
Built-in monitoring
Auto-scaling capabilities
Use cases:
AWS-based ML infrastructure
GPU/TPU training jobs
Large-scale model training
Cost optimization with spot instances
Example:
from zenml import step
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
SageMakerStepOperatorSettings,
)
sagemaker_settings = SageMakerStepOperatorSettings(
instance_type = "ml.p3.2xlarge" ,
max_runtime_in_seconds = 3600 ,
use_spot_instances = True ,
)
@step (
step_operator = "sagemaker_step_operator" ,
settings = { "step_operator.sagemaker" : sagemaker_settings},
)
def train_on_sagemaker ( data ) -> Model:
# Runs on SageMaker training instance
model = train_large_model(data)
return model
Vertex AI Step Operator
Runs steps on Google Cloud Vertex AI Custom Jobs.
Installation:
zenml integration install gcp
Configuration:
zenml step-operator register vertex_step_operator --flavor=vertex \
--project=my-gcp-project \
--location=us-central1 \
--machine_type=n1-standard-4 \
--accelerator_type=NVIDIA_TESLA_T4 \
--accelerator_count=1
Requirements:
GCP project with Vertex AI enabled
Service account with Vertex AI permissions
Container registry (GCR/Artifact Registry)
GCS artifact store
Features:
Managed ML infrastructure
GPU and TPU support
Custom machine types
Pre-configured ML containers
Integration with Vertex AI ecosystem
Use cases:
GCP-based ML workflows
GPU/TPU training
Large-scale distributed training
Vertex AI platform integration
Example:
from zenml import step
from zenml.integrations.gcp.flavors.vertex_step_operator_flavor import (
VertexStepOperatorSettings,
)
vertex_settings = VertexStepOperatorSettings(
machine_type = "n1-standard-8" ,
accelerator_type = "NVIDIA_TESLA_V100" ,
accelerator_count = 2 ,
)
@step (
step_operator = "vertex_step_operator" ,
settings = { "step_operator.vertex" : vertex_settings},
)
def train_on_vertex ( data ) -> Model:
# Runs on Vertex AI with 2 V100 GPUs
model = distributed_training(data)
return model
Azure ML Step Operator
Runs steps on Azure Machine Learning Compute.
Installation:
zenml integration install azure
Configuration:
zenml step-operator register azureml_step_operator --flavor=azureml \
--subscription_id= < subscription-id > \
--resource_group= < resource-group > \
--workspace_name= < workspace-name > \
--compute_target_name=gpu-cluster
Requirements:
Azure subscription
Azure ML workspace
Compute cluster or compute instance
Azure Container Registry
Azure Blob Storage artifact store
Features:
Managed compute resources
Auto-scaling clusters
GPU and CPU options
Cost management
Integration with Azure ML
Use cases:
Azure-based infrastructure
Enterprise Azure deployments
GPU training on Azure
Azure ML ecosystem integration
Modal Step Operator
Runs steps on Modal’s serverless infrastructure.
Installation:
zenml integration install modal
Configuration:
zenml step-operator register modal_step_operator --flavor=modal
Authentication:
Features:
Serverless execution
Pay-per-use pricing
Fast cold starts
GPU support
Automatic scaling
Use cases:
Serverless ML workflows
Sporadic GPU needs
Cost optimization
Quick experimentation
Example:
from zenml import step
from zenml.integrations.modal.flavors.modal_step_operator_flavor import (
ModalStepOperatorSettings,
)
modal_settings = ModalStepOperatorSettings(
gpu = "any" , # Request any available GPU
cpu = 4.0 ,
memory = 16384 , # MB
)
@step (
step_operator = "modal_step_operator" ,
settings = { "step_operator.modal" : modal_settings},
)
def train_on_modal ( data ) -> Model:
# Runs serverless on Modal with GPU
model = train(data)
return model
Choosing a Step Operator
Step Operator Best For Pricing Model GPU Support Kubernetes Self-hosted, flexibility Infrastructure cost Yes (if cluster has GPUs) SageMaker AWS infrastructure Per-second billing Yes (wide selection) Vertex AI GCP infrastructure Per-second billing Yes (GPUs and TPUs) Azure ML Azure infrastructure Per-minute billing Yes (various SKUs) Modal Serverless, experimentation Pay-per-use Yes (on-demand)
Resource Configuration
Specifying Resources
from zenml import step
from zenml.config import ResourceSettings
@step (
step_operator = "k8s_step_operator" ,
settings = {
"resources" : ResourceSettings(
cpu_count = 8 ,
memory = "32GB" ,
gpu_count = 2 ,
)
},
)
def resource_intensive_step ( data ) -> Result:
# This step gets the specified resources
result = process_with_resources(data)
return result
Cloud-Specific Resources
SageMaker:
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
SageMakerStepOperatorSettings,
)
settings = SageMakerStepOperatorSettings(
instance_type = "ml.p3.8xlarge" , # 4 V100 GPUs
max_runtime_in_seconds = 7200 ,
volume_size_in_gb = 100 ,
use_spot_instances = True ,
max_wait_time_in_seconds = 86400 ,
)
Vertex AI:
from zenml.integrations.gcp.flavors.vertex_step_operator_flavor import (
VertexStepOperatorSettings,
)
settings = VertexStepOperatorSettings(
machine_type = "n1-highmem-8" ,
accelerator_type = "NVIDIA_TESLA_A100" ,
accelerator_count = 1 ,
disk_size_gb = 200 ,
)
Mixed Infrastructure Pipelines
Combine different execution environments:
from zenml import step, pipeline
@step # Runs on orchestrator
def load_data () -> Data:
return load_from_database()
@step # Runs on orchestrator
def preprocess ( data : Data) -> Data:
return clean_and_transform(data)
@step ( step_operator = "sagemaker_step_operator" ) # Runs on SageMaker
def train_model ( data : Data) -> Model:
return train_large_model(data)
@step ( step_operator = "modal_step_operator" ) # Runs on Modal
def generate_explanations ( model : Model, data : Data) -> Explanations:
return explain_model(model, data)
@step # Runs on orchestrator
def save_results ( model : Model, explanations : Explanations) -> None :
save_to_registry(model)
save_to_database(explanations)
@pipeline
def mixed_pipeline ():
data = load_data()
processed = preprocess(data)
model = train_model(processed) # On SageMaker
explanations = generate_explanations(model, processed) # On Modal
save_results(model, explanations)
Best Practices
Use Step Operators for Resource-Intensive Steps
# Good: GPU training on step operator
@step ( step_operator = "sagemaker_step_operator" )
def train_deep_learning_model ( data ) -> Model:
return train_on_gpu(data)
# Bad: Simple data loading on step operator (unnecessary overhead)
@step ( step_operator = "sagemaker_step_operator" )
def load_csv () -> Data:
return pd.read_csv( "data.csv" )
Minimize Data Transfer
# Good: Process data close to where it's stored
@step ( step_operator = "vertex_step_operator" )
def process_large_dataset () -> ProcessedData:
# Load from GCS (close to Vertex AI)
data = load_from_gcs()
return process(data)
# Consider: May involve unnecessary data transfer
@step ( step_operator = "vertex_step_operator" )
def process_small_preprocessed_data ( data : SmallData) -> Result:
# Small data, might not need step operator
return simple_transform(data)
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
SageMakerStepOperatorSettings,
)
# Set appropriate timeouts
settings = SageMakerStepOperatorSettings(
max_runtime_in_seconds = 3600 , # 1 hour max
max_wait_time_in_seconds = 7200 , # For spot instances
)
Use Spot/Preemptible Instances
# SageMaker spot instances (up to 90% savings)
sagemaker_settings = SageMakerStepOperatorSettings(
instance_type = "ml.p3.2xlarge" ,
use_spot_instances = True ,
max_wait_time_in_seconds = 3600 ,
)
# GCP preemptible instances
vertex_settings = VertexStepOperatorSettings(
machine_type = "n1-standard-8" ,
use_preemptible = True ,
)
Monitoring Step Execution
Check Step Status
from zenml.client import Client
client = Client()
run = client.get_pipeline_run( "training_pipeline" , "run_name" )
for step_name, step in run.steps.items():
print ( f "Step: { step_name } " )
print ( f "Status: { step.status } " )
print ( f "Duration: { step.duration } " )
# Check if step used a step operator
if step.config.step_operator:
print ( f "Step Operator: { step.config.step_operator } " )
Cloud Console Monitoring
SageMaker:
AWS Console → SageMaker → Training jobs
View logs in CloudWatch
Vertex AI:
GCP Console → Vertex AI → Custom Jobs
View logs in Cloud Logging
Azure ML:
Azure Portal → Machine Learning → Experiments
View logs in workspace
Troubleshooting
Step Operator Not Found
# List registered step operators
zenml step-operator list
# Describe specific step operator
zenml step-operator describe k8s_step_operator
Permission Errors
# SageMaker: Check IAM role
aws iam get-role --role-name ZenMLSageMaker
# Vertex AI: Check service account
gcloud projects get-iam-policy PROJECT_ID \
--flatten= "bindings[].members" \
--filter= "bindings.members:serviceAccount:*"
# Azure: Check permissions
az role assignment list --assignee < service-principal-i d >
Resource Limits
# AWS: Check SageMaker quotas
aws service-quotas list-service-quotas \
--service-code sagemaker
# GCP: Check Vertex AI quotas
gcloud compute project-info describe \
--format= "value(quotas)"
# Azure: Check subscription limits
az vm list-usage --location eastus
Container Build Failures
# Enable debug logging
import os
os.environ[ "ZENML_LOGGING_VERBOSITY" ] = "DEBUG"
# Check container registry access
# Ensure your step operator can pull images
Cost Optimization
Use Spot/Preemptible Instances
Save up to 90% on compute costs:
# SageMaker
sagemaker_settings = SageMakerStepOperatorSettings(
use_spot_instances = True ,
max_wait_time_in_seconds = 3600 ,
)
# Vertex AI
vertex_settings = VertexStepOperatorSettings(
use_preemptible = True ,
)
Right-Size Resources
# Don't over-provision
settings = SageMakerStepOperatorSettings(
instance_type = "ml.m5.xlarge" , # Start small
# Scale up only if needed
)
Set Timeouts
# Prevent runaway costs
settings = SageMakerStepOperatorSettings(
max_runtime_in_seconds = 3600 , # Kill after 1 hour
)
Next Steps
Stack Components Overview Learn about other stack components
Advanced Pipelines Build sophisticated ML pipelines