Documentation Index Fetch the complete documentation index at: https://mintlify.com/zenml-io/zenml/llms.txt
Use this file to discover all available pages before exploring further.
Resource configuration allows you to specify hardware requirements for individual pipeline steps. This ensures your steps run with appropriate computational resources, whether on local machines or cloud infrastructure.
Understanding Resource Settings
ZenML’s ResourceSettings class provides a unified interface for specifying:
CPU cores : Number of CPU cores allocated to a step
Memory : Amount of RAM allocated to a step
GPU : Number of GPU devices allocated to a step
Replicas : Minimum and maximum instance counts (for deployed pipelines)
Autoscaling : Metrics and targets for automatic scaling
Concurrency : Maximum concurrent requests per instance
Basic Resource Configuration
Configuring a Single Step
Specify resources when defining a step:
from zenml import step
from zenml.config import ResourceSettings
@step (
settings = {
"resources" : ResourceSettings(
cpu_count = 4 ,
memory = "8GB" ,
gpu_count = 0 ,
)
}
)
def train_model ( data : dict ) -> dict :
"""Train model with 4 CPUs and 8GB RAM."""
# Training logic here
return { "model" : "trained" }
Configuring at Pipeline Level
Apply default resources to all steps:
from zenml import pipeline
from zenml.config import ResourceSettings
@pipeline (
settings = {
"resources" : ResourceSettings(
cpu_count = 2 ,
memory = "4GB" ,
)
}
)
def ml_pipeline ():
"""All steps get 2 CPUs and 4GB RAM by default."""
data = load_data()
model = train_model(data)
evaluate_model(model)
Step-level settings override pipeline-level settings:
# Pipeline default: 2 CPUs, 4GB RAM
@pipeline (
settings = {
"resources" : ResourceSettings( cpu_count = 2 , memory = "4GB" )
}
)
def ml_pipeline ():
data = load_data() # Uses pipeline default: 2 CPUs, 4GB
# Override for specific step
model = train_model.with_options(
settings = {
"resources" : ResourceSettings(
cpu_count = 8 ,
memory = "16GB" ,
gpu_count = 1 ,
)
}
)(data)
evaluate_model(model) # Uses pipeline default: 2 CPUs, 4GB
Memory Specification
Memory Units
ZenML supports standard memory units:
from zenml.config import ResourceSettings
# Decimal units (powers of 1000)
ResourceSettings( memory = "1KB" ) # 1,000 bytes
ResourceSettings( memory = "1MB" ) # 1,000,000 bytes
ResourceSettings( memory = "1GB" ) # 1,000,000,000 bytes
ResourceSettings( memory = "1TB" ) # 1,000,000,000,000 bytes
# Binary units (powers of 1024)
ResourceSettings( memory = "1KiB" ) # 1,024 bytes
ResourceSettings( memory = "1MiB" ) # 1,048,576 bytes
ResourceSettings( memory = "1GiB" ) # 1,073,741,824 bytes
ResourceSettings( memory = "1TiB" ) # 1,099,511,627,776 bytes
Converting Memory Units
Programmatically convert between units:
from zenml.config import ResourceSettings, ByteUnit
resources = ResourceSettings( memory = "8GB" )
# Get memory in different units
memory_gb = resources.get_memory( unit = ByteUnit. GB ) # 8.0
memory_mb = resources.get_memory( unit = ByteUnit. MB ) # 8000.0
memory_gib = resources.get_memory( unit = ByteUnit. GIB ) # ~7.45
# Use with orchestrator configuration
if memory_gb and memory_gb > 10 :
print ( "Large memory requirement detected" )
GPU Configuration
Requesting GPUs
from zenml import step
from zenml.config import ResourceSettings
@step (
settings = {
"resources" : ResourceSettings(
cpu_count = 4 ,
memory = "16GB" ,
gpu_count = 1 , # Request 1 GPU
)
}
)
def train_with_gpu ( data : dict ) -> dict :
"""Train model using GPU acceleration."""
import torch
device = torch.device( "cuda" if torch.cuda.is_available() else "cpu" )
print ( f "Training on: { device } " )
# GPU-accelerated training
model = Model().to(device)
# ... training logic ...
return { "model" : model}
Multi-GPU Training
@step (
settings = {
"resources" : ResourceSettings(
cpu_count = 16 ,
memory = "64GB" ,
gpu_count = 4 , # Request 4 GPUs
)
}
)
def distributed_training ( data : dict ) -> dict :
"""Distributed training across multiple GPUs."""
import torch
import torch.distributed as dist
# Initialize distributed training
if torch.cuda.device_count() > 1 :
print ( f "Using { torch.cuda.device_count() } GPUs" )
model = torch.nn.DataParallel(model)
# ... training logic ...
return { "model" : model}
Orchestrator Implementation
Reading Resource Settings
Orchestrators access resource settings from step configurations:
from zenml.orchestrators import BaseOrchestrator
from zenml.config.resource_settings import ResourceSettings
class MyOrchestrator ( BaseOrchestrator ):
def submit_pipeline ( self , snapshot , stack , base_environment , step_environments , placeholder_run ):
"""Submit pipeline with resource configurations."""
for invocation_id, step in snapshot.step_configurations.items():
# Get resource settings for this step
resources: ResourceSettings = step.config.resource_settings
# Check if resources are specified
if resources.empty:
# Use orchestrator defaults
cpu_count = 1
memory_mb = 2048
gpu_count = 0
else :
# Use specified resources
cpu_count = resources.cpu_count or 1
# Convert memory to orchestrator format
memory_mb = None
if resources.memory:
memory_gb = resources.get_memory( unit = "GB" )
memory_mb = int (memory_gb * 1024 )
gpu_count = resources.gpu_count or 0
# Submit step with resources
self ._submit_step(
step_name = step.config.name,
image = self .get_image(snapshot, step.config.name),
cpu_count = cpu_count,
memory_mb = memory_mb,
gpu_count = gpu_count,
)
Different platforms use different resource specifications:
class KubernetesOrchestrator ( ContainerizedOrchestrator ):
def _build_pod_spec ( self , step_config ):
"""Build Kubernetes pod spec with resource requests/limits."""
resources = step_config.resource_settings
# Kubernetes format
k8s_resources = {
"requests" : {},
"limits" : {},
}
# CPU (in cores)
if resources.cpu_count:
k8s_resources[ "requests" ][ "cpu" ] = str (resources.cpu_count)
k8s_resources[ "limits" ][ "cpu" ] = str (resources.cpu_count)
# Memory (in bytes)
if resources.memory:
memory_bytes = int (resources.get_memory( unit = "GB" ) * 1024 ** 3 )
k8s_resources[ "requests" ][ "memory" ] = f " { memory_bytes } "
k8s_resources[ "limits" ][ "memory" ] = f " { memory_bytes } "
# GPU (nvidia.com/gpu)
if resources.gpu_count:
k8s_resources[ "limits" ][ "nvidia.com/gpu" ] = str (resources.gpu_count)
return k8s_resources
class SageMakerOrchestrator ( ContainerizedOrchestrator ):
def _get_instance_type ( self , resources : ResourceSettings) -> str :
"""Map resources to SageMaker instance type."""
# AWS SageMaker uses instance types
if resources.gpu_count and resources.gpu_count > 0 :
if resources.gpu_count == 1 :
return "ml.p3.2xlarge" # 1 GPU, 8 vCPU, 61GB RAM
elif resources.gpu_count == 4 :
return "ml.p3.8xlarge" # 4 GPUs, 32 vCPU, 244GB RAM
# CPU instances
memory_gb = resources.get_memory( unit = "GB" ) if resources.memory else 4
cpu_count = resources.cpu_count or 2
if cpu_count <= 2 and memory_gb <= 4 :
return "ml.m5.large" # 2 vCPU, 8GB RAM
elif cpu_count <= 4 and memory_gb <= 16 :
return "ml.m5.xlarge" # 4 vCPU, 16GB RAM
elif cpu_count <= 8 and memory_gb <= 32 :
return "ml.m5.2xlarge" # 8 vCPU, 32GB RAM
else :
return "ml.m5.4xlarge" # 16 vCPU, 64GB RAM
Deployed Pipeline Resources
For deployed pipelines (as services), additional resource settings control scaling behavior:
Replica Configuration
from zenml.config import ResourceSettings
resources = ResourceSettings(
# Basic resources
cpu_count = 2 ,
memory = "4GB" ,
# Replica settings
min_replicas = 1 , # Minimum instances (0 allows scale-to-zero)
max_replicas = 10 , # Maximum instances (0 means no limit)
)
Autoscaling Configuration
resources = ResourceSettings(
cpu_count = 2 ,
memory = "4GB" ,
# Scaling bounds
min_replicas = 2 ,
max_replicas = 20 ,
# Autoscaling metric
autoscaling_metric = "cpu" , # Options: "cpu", "memory", "concurrency", "rps"
autoscaling_target = 75.0 , # Target 75% CPU utilization
)
Concurrency Limits
resources = ResourceSettings(
cpu_count = 4 ,
memory = "8GB" ,
# Per-instance concurrency limit
max_concurrency = 50 , # Max 50 concurrent requests per instance
# Autoscaling based on concurrency
autoscaling_metric = "concurrency" ,
autoscaling_target = 40.0 , # Scale when average concurrency > 40
)
Checking Resource Requirements
Orchestrators can check if a step needs special resources:
from zenml.orchestrators import BaseOrchestrator
from zenml.config.step_configurations import Step
class MyOrchestrator ( BaseOrchestrator ):
@ staticmethod
def requires_resources_in_orchestration_environment ( step : Step) -> bool :
"""Check if step needs special resources.
Args:
step: The step to check
Returns:
True if step requires custom resources, False otherwise
"""
# If using a step operator, resources are handled there
if step.config.step_operator:
return False
# Check if resources are specified
return not step.config.resource_settings.empty
This is useful for orchestrators that need to allocate resources before launching steps.
Best Practices
Start Conservative Begin with modest resource requests and scale up based on actual usage.
Monitor Usage Track actual resource consumption to optimize configurations.
GPU Cost Awareness GPUs are expensive. Only request them for steps that benefit from acceleration.
Memory Buffer Request slightly more memory than expected to handle variability.
Resource Sizing Guidelines
Data Loading Steps:
ResourceSettings(
cpu_count = 2 ,
memory = "4GB" ,
gpu_count = 0 ,
)
Training Steps (Small Models):
ResourceSettings(
cpu_count = 4 ,
memory = "8GB" ,
gpu_count = 0 , # or 1 for GPU acceleration
)
Training Steps (Large Models):
ResourceSettings(
cpu_count = 8 ,
memory = "32GB" ,
gpu_count = 1 , # or more for distributed training
)
Inference Steps:
ResourceSettings(
cpu_count = 2 ,
memory = "4GB" ,
gpu_count = 0 , # CPU inference is often sufficient
)
Validation
ZenML validates resource settings:
from zenml.config import ResourceSettings
from pydantic import ValidationError
# Valid configuration
resources = ResourceSettings(
min_replicas = 2 ,
max_replicas = 10 ,
)
# Invalid: min > max
try :
resources = ResourceSettings(
min_replicas = 10 ,
max_replicas = 2 ,
)
except ValidationError as e:
print ( "Validation error:" , e)
# Error: min_replicas (10) cannot be greater than max_replicas (2)
Next Steps
Custom Orchestrators Build orchestrators that handle resource configurations
Containerization Package steps with their dependencies
Dynamic Pipelines Adapt resource requirements at runtime
Custom Materializers Handle data efficiently to optimize memory usage