Use this file to discover all available pages before exploring further.
Orchestrators are responsible for executing your pipeline steps on various infrastructure backends. Building a custom orchestrator allows you to integrate ZenML with any execution backend, from cloud platforms to on-premise clusters.
ZenML supports two execution modes:Static pipelines: The complete DAG (directed acyclic graph) is known before execution begins. Steps are submitted individually to the backend.Dynamic pipelines: The DAG can change during execution. An orchestration container runs first to determine which steps to execute.
For containerized orchestrators (most cloud platforms), inherit from ContainerizedOrchestrator:
import osfrom typing import Dict, Optionalfrom zenml.orchestrators import ContainerizedOrchestratorfrom zenml.models import PipelineRunResponse, PipelineSnapshotResponsefrom zenml.stack import Stackclass MyOrchestrator(ContainerizedOrchestrator): """Custom orchestrator implementation.""" @property def config(self) -> MyOrchestratorConfig: """Returns the orchestrator configuration.""" return cast(MyOrchestratorConfig, self._config) def get_orchestrator_run_id(self) -> str: """Returns the run ID of the active orchestrator run. This is the most critical method to implement correctly. Requirements: - Must return the SAME value for all steps in a pipeline run - Must be UNIQUE across different runs - Limited to ~250 characters (MySQL column limit) Returns: The unique orchestrator run ID. """ # For static pipelines: read from environment variable # set by your orchestrator when launching steps if "MY_ORCHESTRATOR_RUN_ID" in os.environ: return os.environ["MY_ORCHESTRATOR_RUN_ID"] # For dynamic pipelines: use container-specific identifier # This only needs to be unique within the orchestration container return os.environ.get("CONTAINER_ID", "unknown") def submit_pipeline( self, snapshot: PipelineSnapshotResponse, stack: Stack, base_environment: Dict[str, str], step_environments: Dict[str, Dict[str, str]], placeholder_run: Optional[PipelineRunResponse] = None, ) -> Optional[SubmissionResult]: """Submit a static pipeline to the orchestration backend. Args: snapshot: Pipeline snapshot with step configurations stack: The stack the pipeline will run on base_environment: Environment variables for all steps step_environments: Per-step environment variables placeholder_run: Optional placeholder for the pipeline run Returns: Optional submission result with wait callback. """ from zenml.orchestrators import SubmissionResult # Generate unique run ID for this pipeline execution orchestrator_run_id = self._generate_run_id() # Prepare steps for execution for invocation_id, step in snapshot.step_configurations.items(): # Get the Docker image for this step image = self.get_image(snapshot, step.config.name) # Get environment variables for this step env = step_environments[invocation_id].copy() env["MY_ORCHESTRATOR_RUN_ID"] = orchestrator_run_id # Get resource requirements resources = step.config.resource_settings # Submit step to your backend self._submit_step( step_name=step.config.name, image=image, environment=env, cpu_count=resources.cpu_count, memory=resources.memory, gpu_count=resources.gpu_count, ) # Return result with optional wait callback return SubmissionResult( wait_for_completion=lambda: self._wait_for_run(orchestrator_run_id) ) def submit_dynamic_pipeline( self, snapshot: PipelineSnapshotResponse, stack: Stack, environment: Dict[str, str], placeholder_run: Optional[PipelineRunResponse] = None, ) -> Optional[SubmissionResult]: """Submit a dynamic pipeline to the orchestration backend. For dynamic pipelines, submit an orchestration container that will determine and launch the actual pipeline steps at runtime. Args: snapshot: Pipeline snapshot stack: The stack the pipeline will run on environment: Environment variables for orchestration container placeholder_run: Optional placeholder for the pipeline run Returns: Optional submission result. """ from zenml.orchestrators import SubmissionResult # Get image for orchestration container image = self.get_image(snapshot) # Submit orchestration container job_id = self._submit_orchestration_job( image=image, environment=environment, ) return SubmissionResult( wait_for_completion=lambda: self._wait_for_job(job_id) ) def _generate_run_id(self) -> str: """Generate a unique run ID for the pipeline.""" import uuid return f"run-{uuid.uuid4().hex[:12]}" def _submit_step( self, step_name: str, image: str, environment: Dict[str, str], cpu_count: Optional[float], memory: Optional[str], gpu_count: Optional[int], ) -> None: """Submit a single step to your execution backend.""" # Implementation depends on your backend # Examples: boto3 for AWS, google.cloud for GCP, kubernetes for K8s pass def _submit_orchestration_job(self, image: str, environment: Dict[str, str]) -> str: """Submit orchestration container for dynamic pipelines.""" # Submit a job that will run the orchestration logic pass def _wait_for_run(self, run_id: str) -> None: """Wait for a pipeline run to complete.""" # Poll your backend until the run finishes pass def _wait_for_job(self, job_id: str) -> None: """Wait for an orchestration job to complete.""" pass
import socketimport osdef get_orchestrator_run_id(self) -> str: # Static pipelines: use pre-set environment variable try: return os.environ["ZENML_KUBERNETES_RUN_ID"] except KeyError: # Dynamic pipelines: use pod name (unique per container) return socket.gethostname()
For cloud platforms (AWS, GCP, Azure):
import osimport jsondef get_orchestrator_run_id(self) -> str: # Check environment variables for env_var in ["MY_PLATFORM_JOB_ID", "MY_PLATFORM_EXECUTION_ARN"]: if env_var in os.environ: return os.environ[env_var] # Fall back to config file if your platform provides one config_path = "/opt/platform/config.json" if os.path.exists(config_path): with open(config_path) as f: config = json.load(f) return config["job_id"] raise RuntimeError("Unable to determine orchestrator run ID")
from zenml import pipeline, step@stepdef load_data() -> int: return 42@stepdef process_data(data: int) -> int: return data * 2@pipelinedef test_pipeline(): data = load_data() process_data(data)# Run with your orchestratortest_pipeline.with_options( orchestrator="my_orchestrator")()