Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/zenml-io/zenml/llms.txt

Use this file to discover all available pages before exploring further.

Step Operators

Step operators allow you to run individual pipeline steps on custom infrastructure. While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where a specific step runs.

Overview

Step operators are useful when:
  • A specific step needs GPU resources (e.g., model training)
  • A step requires more compute resources than others
  • You want to run a step on different infrastructure (e.g., serverless)
  • A step has special requirements (e.g., specific hardware, libraries)

How Step Operators Work

When a pipeline runs:
  1. The orchestrator executes most steps normally
  2. When a step with a step operator is reached:
    • The orchestrator hands off execution to the step operator
    • The step operator runs the step on its configured infrastructure
    • Results are returned to the orchestrator
  3. The orchestrator continues with the next step
This allows fine-grained control over where each step executes.

Available Step Operators

Kubernetes Step Operator

Runs individual steps as Kubernetes jobs. Installation:
zenml integration install kubernetes
Configuration:
zenml step-operator register k8s_step_operator --flavor=kubernetes \
  --kubernetes_context=my-context \
  --kubernetes_namespace=zenml
Requirements:
  • Kubernetes cluster access
  • Container registry in your stack
  • Configured kubectl context
Use cases:
  • Running GPU-intensive training steps
  • Steps requiring specific node pools
  • Isolated execution environments
  • Resource-intensive data processing
Example:
from zenml import step, pipeline
from zenml.config import ResourceSettings

@step(
    step_operator="k8s_step_operator",
    settings={
        "resources": ResourceSettings(
            cpu_count=8,
            memory="32GB",
            gpu_count=1,
        )
    },
)
def gpu_training_step(data) -> Model:
    # This step runs on Kubernetes with GPU
    model = train_on_gpu(data)
    return model

@step
def preprocessing_step() -> Data:
    # This step runs on the orchestrator
    data = load_and_preprocess()
    return data

@pipeline
def training_pipeline():
    data = preprocessing_step()
    model = gpu_training_step(data)

SageMaker Step Operator

Runs steps on AWS SageMaker Training Jobs. Installation:
zenml integration install aws
Configuration:
zenml step-operator register sagemaker_step_operator --flavor=sagemaker \
  --role=arn:aws:iam::123456789012:role/ZenMLSageMaker \
  --instance_type=ml.p3.2xlarge
Requirements:
  • AWS account with SageMaker access
  • IAM role with SageMaker permissions
  • Container registry (ECR)
  • S3 artifact store
Features:
  • Managed infrastructure
  • Wide range of instance types
  • Spot instance support
  • Built-in monitoring
  • Auto-scaling capabilities
Use cases:
  • AWS-based ML infrastructure
  • GPU/TPU training jobs
  • Large-scale model training
  • Cost optimization with spot instances
Example:
from zenml import step
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
    SageMakerStepOperatorSettings,
)

sagemaker_settings = SageMakerStepOperatorSettings(
    instance_type="ml.p3.2xlarge",
    max_runtime_in_seconds=3600,
    use_spot_instances=True,
)

@step(
    step_operator="sagemaker_step_operator",
    settings={"step_operator.sagemaker": sagemaker_settings},
)
def train_on_sagemaker(data) -> Model:
    # Runs on SageMaker training instance
    model = train_large_model(data)
    return model

Vertex AI Step Operator

Runs steps on Google Cloud Vertex AI Custom Jobs. Installation:
zenml integration install gcp
Configuration:
zenml step-operator register vertex_step_operator --flavor=vertex \
  --project=my-gcp-project \
  --location=us-central1 \
  --machine_type=n1-standard-4 \
  --accelerator_type=NVIDIA_TESLA_T4 \
  --accelerator_count=1
Requirements:
  • GCP project with Vertex AI enabled
  • Service account with Vertex AI permissions
  • Container registry (GCR/Artifact Registry)
  • GCS artifact store
Features:
  • Managed ML infrastructure
  • GPU and TPU support
  • Custom machine types
  • Pre-configured ML containers
  • Integration with Vertex AI ecosystem
Use cases:
  • GCP-based ML workflows
  • GPU/TPU training
  • Large-scale distributed training
  • Vertex AI platform integration
Example:
from zenml import step
from zenml.integrations.gcp.flavors.vertex_step_operator_flavor import (
    VertexStepOperatorSettings,
)

vertex_settings = VertexStepOperatorSettings(
    machine_type="n1-standard-8",
    accelerator_type="NVIDIA_TESLA_V100",
    accelerator_count=2,
)

@step(
    step_operator="vertex_step_operator",
    settings={"step_operator.vertex": vertex_settings},
)
def train_on_vertex(data) -> Model:
    # Runs on Vertex AI with 2 V100 GPUs
    model = distributed_training(data)
    return model

Azure ML Step Operator

Runs steps on Azure Machine Learning Compute. Installation:
zenml integration install azure
Configuration:
zenml step-operator register azureml_step_operator --flavor=azureml \
  --subscription_id=<subscription-id> \
  --resource_group=<resource-group> \
  --workspace_name=<workspace-name> \
  --compute_target_name=gpu-cluster
Requirements:
  • Azure subscription
  • Azure ML workspace
  • Compute cluster or compute instance
  • Azure Container Registry
  • Azure Blob Storage artifact store
Features:
  • Managed compute resources
  • Auto-scaling clusters
  • GPU and CPU options
  • Cost management
  • Integration with Azure ML
Use cases:
  • Azure-based infrastructure
  • Enterprise Azure deployments
  • GPU training on Azure
  • Azure ML ecosystem integration
Runs steps on Modal’s serverless infrastructure. Installation:
zenml integration install modal
Configuration:
zenml step-operator register modal_step_operator --flavor=modal
Authentication:
modal token new
Features:
  • Serverless execution
  • Pay-per-use pricing
  • Fast cold starts
  • GPU support
  • Automatic scaling
Use cases:
  • Serverless ML workflows
  • Sporadic GPU needs
  • Cost optimization
  • Quick experimentation
Example:
from zenml import step
from zenml.integrations.modal.flavors.modal_step_operator_flavor import (
    ModalStepOperatorSettings,
)

modal_settings = ModalStepOperatorSettings(
    gpu="any",  # Request any available GPU
    cpu=4.0,
    memory=16384,  # MB
)

@step(
    step_operator="modal_step_operator",
    settings={"step_operator.modal": modal_settings},
)
def train_on_modal(data) -> Model:
    # Runs serverless on Modal with GPU
    model = train(data)
    return model

Choosing a Step Operator

Step OperatorBest ForPricing ModelGPU Support
KubernetesSelf-hosted, flexibilityInfrastructure costYes (if cluster has GPUs)
SageMakerAWS infrastructurePer-second billingYes (wide selection)
Vertex AIGCP infrastructurePer-second billingYes (GPUs and TPUs)
Azure MLAzure infrastructurePer-minute billingYes (various SKUs)
ModalServerless, experimentationPay-per-useYes (on-demand)

Resource Configuration

Specifying Resources

from zenml import step
from zenml.config import ResourceSettings

@step(
    step_operator="k8s_step_operator",
    settings={
        "resources": ResourceSettings(
            cpu_count=8,
            memory="32GB",
            gpu_count=2,
        )
    },
)
def resource_intensive_step(data) -> Result:
    # This step gets the specified resources
    result = process_with_resources(data)
    return result

Cloud-Specific Resources

SageMaker:
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
    SageMakerStepOperatorSettings,
)

settings = SageMakerStepOperatorSettings(
    instance_type="ml.p3.8xlarge",  # 4 V100 GPUs
    max_runtime_in_seconds=7200,
    volume_size_in_gb=100,
    use_spot_instances=True,
    max_wait_time_in_seconds=86400,
)
Vertex AI:
from zenml.integrations.gcp.flavors.vertex_step_operator_flavor import (
    VertexStepOperatorSettings,
)

settings = VertexStepOperatorSettings(
    machine_type="n1-highmem-8",
    accelerator_type="NVIDIA_TESLA_A100",
    accelerator_count=1,
    disk_size_gb=200,
)

Mixed Infrastructure Pipelines

Combine different execution environments:
from zenml import step, pipeline

@step  # Runs on orchestrator
def load_data() -> Data:
    return load_from_database()

@step  # Runs on orchestrator
def preprocess(data: Data) -> Data:
    return clean_and_transform(data)

@step(step_operator="sagemaker_step_operator")  # Runs on SageMaker
def train_model(data: Data) -> Model:
    return train_large_model(data)

@step(step_operator="modal_step_operator")  # Runs on Modal
def generate_explanations(model: Model, data: Data) -> Explanations:
    return explain_model(model, data)

@step  # Runs on orchestrator
def save_results(model: Model, explanations: Explanations) -> None:
    save_to_registry(model)
    save_to_database(explanations)

@pipeline
def mixed_pipeline():
    data = load_data()
    processed = preprocess(data)
    model = train_model(processed)  # On SageMaker
    explanations = generate_explanations(model, processed)  # On Modal
    save_results(model, explanations)

Best Practices

Use Step Operators for Resource-Intensive Steps

# Good: GPU training on step operator
@step(step_operator="sagemaker_step_operator")
def train_deep_learning_model(data) -> Model:
    return train_on_gpu(data)

# Bad: Simple data loading on step operator (unnecessary overhead)
@step(step_operator="sagemaker_step_operator")
def load_csv() -> Data:
    return pd.read_csv("data.csv")

Minimize Data Transfer

# Good: Process data close to where it's stored
@step(step_operator="vertex_step_operator")
def process_large_dataset() -> ProcessedData:
    # Load from GCS (close to Vertex AI)
    data = load_from_gcs()
    return process(data)

# Consider: May involve unnecessary data transfer
@step(step_operator="vertex_step_operator")
def process_small_preprocessed_data(data: SmallData) -> Result:
    # Small data, might not need step operator
    return simple_transform(data)

Configure Timeouts

from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
    SageMakerStepOperatorSettings,
)

# Set appropriate timeouts
settings = SageMakerStepOperatorSettings(
    max_runtime_in_seconds=3600,  # 1 hour max
    max_wait_time_in_seconds=7200,  # For spot instances
)

Use Spot/Preemptible Instances

# SageMaker spot instances (up to 90% savings)
sagemaker_settings = SageMakerStepOperatorSettings(
    instance_type="ml.p3.2xlarge",
    use_spot_instances=True,
    max_wait_time_in_seconds=3600,
)

# GCP preemptible instances
vertex_settings = VertexStepOperatorSettings(
    machine_type="n1-standard-8",
    use_preemptible=True,
)

Monitoring Step Execution

Check Step Status

from zenml.client import Client

client = Client()
run = client.get_pipeline_run("training_pipeline", "run_name")

for step_name, step in run.steps.items():
    print(f"Step: {step_name}")
    print(f"Status: {step.status}")
    print(f"Duration: {step.duration}")
    
    # Check if step used a step operator
    if step.config.step_operator:
        print(f"Step Operator: {step.config.step_operator}")

Cloud Console Monitoring

SageMaker:
  • AWS Console → SageMaker → Training jobs
  • View logs in CloudWatch
Vertex AI:
  • GCP Console → Vertex AI → Custom Jobs
  • View logs in Cloud Logging
Azure ML:
  • Azure Portal → Machine Learning → Experiments
  • View logs in workspace

Troubleshooting

Step Operator Not Found

# List registered step operators
zenml step-operator list

# Describe specific step operator
zenml step-operator describe k8s_step_operator

Permission Errors

# SageMaker: Check IAM role
aws iam get-role --role-name ZenMLSageMaker

# Vertex AI: Check service account
gcloud projects get-iam-policy PROJECT_ID \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:*"

# Azure: Check permissions
az role assignment list --assignee <service-principal-id>

Resource Limits

# AWS: Check SageMaker quotas
aws service-quotas list-service-quotas \
  --service-code sagemaker

# GCP: Check Vertex AI quotas
gcloud compute project-info describe \
  --format="value(quotas)"

# Azure: Check subscription limits
az vm list-usage --location eastus

Container Build Failures

# Enable debug logging
import os
os.environ["ZENML_LOGGING_VERBOSITY"] = "DEBUG"

# Check container registry access
# Ensure your step operator can pull images

Cost Optimization

Use Spot/Preemptible Instances

Save up to 90% on compute costs:
# SageMaker
sagemaker_settings = SageMakerStepOperatorSettings(
    use_spot_instances=True,
    max_wait_time_in_seconds=3600,
)

# Vertex AI
vertex_settings = VertexStepOperatorSettings(
    use_preemptible=True,
)

Right-Size Resources

# Don't over-provision
settings = SageMakerStepOperatorSettings(
    instance_type="ml.m5.xlarge",  # Start small
    # Scale up only if needed
)

Set Timeouts

# Prevent runaway costs
settings = SageMakerStepOperatorSettings(
    max_runtime_in_seconds=3600,  # Kill after 1 hour
)

Next Steps

Stack Components Overview

Learn about other stack components

Advanced Pipelines

Build sophisticated ML pipelines

Build docs developers (and LLMs) love