Use this file to discover all available pages before exploring further.
Steps are the building blocks of ZenML pipelines. Each step is a discrete unit of work that takes inputs, performs a computation, and produces outputs. Well-designed steps make your pipelines modular, testable, and reusable.
from zenml import stepfrom typing import Annotated
3
Step 2: Define Step Function
4
Create a function with clear inputs and outputs:
5
@stepdef load_data(data_path: str) -> Annotated[dict, "dataset"]: """Load data from a file. Args: data_path: Path to the data file Returns: Loaded dataset as a dictionary """ # Your data loading logic data = {"values": [1, 2, 3, 4, 5]} return data
6
Step 3: Add Type Hints
7
Always use type hints for inputs and outputs:
8
@stepdef process_data( dataset: dict, multiplier: int = 2) -> Annotated[list, "processed_data"]: """Process the dataset. Args: dataset: Input dataset dictionary multiplier: Value to multiply each item by Returns: Processed list of values """ values = dataset["values"] processed = [x * multiplier for x in values] return processed
9
Step 4: Use Steps in a Pipeline
10
Connect steps in your pipeline:
11
from zenml import pipeline@pipelinedef data_pipeline(data_path: str): """Pipeline that loads and processes data.""" dataset = load_data(data_path) processed = process_data(dataset, multiplier=3) return processed
from typing import Annotated@stepdef train_model( train_data: dict, learning_rate: float) -> Annotated[object, "trained_model"]: """Train a model on the data. The Annotated type gives the output artifact the name 'trained_model' which makes it easier to identify in the ZenML dashboard. """ # Training logic model = {"lr": learning_rate, "trained": True} return model
from typing import Optional, Annotated@stepdef validate_and_process( data: dict, strict: bool = False) -> Tuple[ Annotated[dict, "processed_data"], Annotated[Optional[dict], "validation_errors"]]: """Validate and process data, optionally returning errors. Args: data: Input data to validate strict: Whether to perform strict validation Returns: Tuple of (processed_data, validation_errors) validation_errors will be None if no errors found """ errors = None if strict: # Perform validation if len(data["values"]) < 10: errors = {"error": "Insufficient data"} # Process data processed = {"values": data["values"], "validated": True} return processed, errors
@stepdef preprocess_text( text: str, lowercase: bool = True, remove_punctuation: bool = True, max_length: int = 1000) -> Annotated[str, "processed_text"]: """Preprocess text with configurable options. Args: text: Input text to process lowercase: Convert to lowercase remove_punctuation: Remove punctuation marks max_length: Maximum length of output Returns: Processed text """ processed = text if lowercase: processed = processed.lower() if remove_punctuation: processed = ''.join(c for c in processed if c.isalnum() or c.isspace()) processed = processed[:max_length] return processed
from zenml import stepfrom zenml.config import ResourceSettings@step(settings={"resources": ResourceSettings(cpu_count=4, memory="8GB")})def train_large_model(data: dict) -> object: """Train a model that requires significant resources.""" # Training logic return model
@step(enable_cache=False)def fetch_latest_data() -> Annotated[dict, "fresh_data"]: """Always fetch fresh data, never use cached results.""" # This step will always execute, even if inputs are the same return fetch_from_api()
@step(step_operator="gpu_operator")def train_model_on_gpu(data: dict) -> object: """Train model using GPU resources.""" # Training logic that uses GPU return model
@stepdef evaluate_model( model: object, test_data: dict, threshold: float = 0.8) -> Annotated[dict, "evaluation_metrics"]: """Evaluate model performance on test data. Computes various metrics including accuracy, precision, recall, and F1 score. If accuracy falls below the threshold, a warning is logged. Args: model: Trained model to evaluate test_data: Test dataset dictionary with 'X' and 'y' keys threshold: Minimum acceptable accuracy (default: 0.8) Returns: Dictionary containing all evaluation metrics Raises: ValueError: If test_data is empty or malformed """ # Implementation pass
@stepdef process_user_input( input_data: str) -> Annotated[dict, "validated_data"]: """Process and validate user input. Args: input_data: Raw input string from user Returns: Validated and processed data Raises: ValueError: If input_data is invalid """ if not input_data or not input_data.strip(): raise ValueError("Input data cannot be empty") try: # Process input processed = parse_input(input_data) return {"data": processed, "valid": True} except Exception as e: raise ValueError(f"Failed to process input: {str(e)}")
from sklearn.linear_model import LogisticRegressionimport pandas as pd@stepdef model_trainer( train_data: pd.DataFrame, target_column: str, learning_rate: float = 0.01, max_iter: int = 100) -> Annotated[LogisticRegression, "trained_model"]: """Train a logistic regression model. Args: train_data: Training dataset target_column: Name of target variable learning_rate: Learning rate for optimization max_iter: Maximum iterations Returns: Trained model """ X = train_data.drop(columns=[target_column]) y = train_data[target_column] model = LogisticRegression( learning_rate=learning_rate, max_iter=max_iter ) model.fit(X, y) print(f"Model trained with {len(X)} samples") return model