Documentation Index Fetch the complete documentation index at: https://mintlify.com/kyryl-opens-ml/ml-in-production-practice/llms.txt
Use this file to discover all available pages before exploring further.
Kubeflow Pipelines
Kubeflow Pipelines (KFP) is a platform for building and deploying portable, scalable ML workflows based on Docker containers. It provides native support for ML artifacts, experiment tracking, and component reusability.
Why Kubeflow Pipelines?
Kubeflow Pipelines offers several advantages for ML workflows:
Component-based architecture : Reusable, containerized pipeline components
Native artifact tracking : Input/Output artifacts with lineage tracking
Pipeline versioning : Track and compare different pipeline versions
Kubernetes-native : Built for cloud-native ML workloads
Experiment management : Organize runs into experiments
Installation & Setup
Deploy Kubeflow Pipelines
Install Kubeflow Pipelines on your Kubernetes cluster: export PIPELINE_VERSION = 2.2.0
export WANDB_PROJECT = your-project
export WANDB_API_KEY = your-key
# Install cluster-scoped resources
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref= $PIPELINE_VERSION "
# Wait for CRDs to be established
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
# Install pipeline components
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/dev?ref= $PIPELINE_VERSION "
Access UI and Storage
Forward ports to access the UI and MinIO storage: # MinIO object storage (artifact store)
kubectl port-forward --address=0.0.0.0 svc/minio-service 9000:9000 -n kubeflow
# Kubeflow Pipelines UI
kubectl port-forward --address=0.0.0.0 svc/ml-pipeline-ui 3000:80 -n kubeflow
Access the UI at http://0.0.0.0:3000
Install Python SDK
Install the Kubeflow Pipelines SDK:
Training Pipeline
The training pipeline uses Kubeflow’s component-based architecture with typed inputs and outputs.
Pipeline Components
Load Data Component
Train Model Component
Upload Model Component
from kfp import dsl
from kfp.dsl import Dataset, Output
IMAGE = "ghcr.io/kyryl-opens-ml/classic-example:main"
@dsl.component ( base_image = IMAGE )
def load_data (
train_data : Output[Dataset],
val_data : Output[Dataset],
test_data : Output[Dataset]
):
import shutil
from pathlib import Path
from classic_example.data import load_sst2_data
# Load SST-2 sentiment dataset
load_sst2_data(Path( "/app/data" ))
# Move to KFP output artifacts
shutil.move(Path( "/app/data" ) / "train.csv" , train_data.path)
shutil.move(Path( "/app/data" ) / "val.csv" , val_data.path)
shutil.move(Path( "/app/data" ) / "test.csv" , test_data.path)
Key Features:
Output[Dataset]: Declares typed output artifacts
Kubeflow automatically tracks artifact lineage
Artifacts stored in MinIO and passed between components
from kfp.dsl import Artifact, Dataset, Input, Model, Output
@dsl.component ( base_image = IMAGE )
def train_model (
config : Output[Artifact],
model : Output[Model],
tokenizer : Output[Artifact],
tokenizer_config : Output[Artifact],
model_card : Output[Artifact],
special_tokens_map : Output[Artifact],
train_data : Input[Dataset],
val_data : Input[Dataset],
test_data : Input[Dataset],
):
import shutil
from pathlib import Path
from classic_example.train import train
# Copy input datasets to working directory
Path( "/tmp/data" ).mkdir( exist_ok = True )
shutil.copy(train_data.path, Path( "/tmp/data" ) / "train.csv" )
shutil.copy(val_data.path, Path( "/tmp/data" ) / "val.csv" )
shutil.copy(test_data.path, Path( "/tmp/data" ) / "test.csv" )
# Train the model
train( config_path = Path( "tests/data/test_config.json" ))
# Move trained artifacts to KFP outputs
shutil.move( "/tmp/results/config.json" , config.path)
shutil.move( "/tmp/results/model.safetensors" , model.path)
shutil.move( "/tmp/results/tokenizer.json" , tokenizer.path)
shutil.move( "/tmp/results/tokenizer_config.json" , tokenizer_config.path)
shutil.move( "/tmp/results/special_tokens_map.json" , model_card.path)
shutil.move( "/tmp/results/README.md" , special_tokens_map.path)
Key Features:
Input[Dataset]: Consumes outputs from previous components
Output[Model]: Special artifact type for ML models
Multiple output artifacts for complete model package
@dsl.component ( base_image = IMAGE )
def upload_model (
config : Input[Artifact],
model : Input[Model],
tokenizer : Input[Artifact],
tokenizer_config : Input[Artifact],
model_card : Input[Artifact],
special_tokens_map : Input[Artifact],
):
import shutil
from pathlib import Path
from classic_example.utils import upload_to_registry
# Reconstruct model directory from artifacts
model_path = Path( "/tmp/model" )
model_path.mkdir( exist_ok = True )
shutil.copy(config.path, model_path / "config.json" )
shutil.copy(model.path, model_path / "model.safetensors" )
shutil.copy(tokenizer.path, model_path / "tokenizer.json" )
shutil.copy(tokenizer_config.path, model_path / "tokenizer_config.json" )
shutil.copy(special_tokens_map.path, model_path / "special_tokens_map.json" )
shutil.copy(model_card.path, model_path / "README.md" )
# Upload to W&B registry
upload_to_registry( model_name = "kfp-pipeline" , model_path = model_path)
Pipeline Definition
kubeflow_pipelines/kfp_training_pipeline.py
import kfp
from kfp import dsl
@dsl.pipeline
def training_pipeline ():
# Step 1: Load datasets
load_data_task = load_data()
# Step 2: Train model with loaded data
train_model_task = train_model(
train_data = load_data_task.outputs[ "train_data" ],
val_data = load_data_task.outputs[ "val_data" ],
test_data = load_data_task.outputs[ "test_data" ],
)
# Set environment variables for W&B tracking
train_model_task = train_model_task.set_env_variable(
name = "WANDB_PROJECT" , value = WANDB_PROJECT
)
train_model_task = train_model_task.set_env_variable(
name = "WANDB_API_KEY" , value = WANDB_API_KEY
)
# Step 3: Upload model artifacts to registry
upload_model_task = upload_model(
config = train_model_task.outputs[ "config" ],
model = train_model_task.outputs[ "model" ],
tokenizer = train_model_task.outputs[ "tokenizer" ],
tokenizer_config = train_model_task.outputs[ "tokenizer_config" ],
model_card = train_model_task.outputs[ "model_card" ],
special_tokens_map = train_model_task.outputs[ "special_tokens_map" ],
)
upload_model_task = upload_model_task.set_env_variable(
name = "WANDB_PROJECT" , value = WANDB_PROJECT
)
upload_model_task = upload_model_task.set_env_variable(
name = "WANDB_API_KEY" , value = WANDB_API_KEY
)
Compiling and Deploying
def compile_pipeline () -> str :
path = "/tmp/training_pipeline.yaml"
kfp.compiler.Compiler().compile(training_pipeline, path)
return path
def create_pipeline ( client : kfp.Client, namespace : str ):
print ( "Creating experiment" )
client.create_experiment( "training" , namespace = namespace)
print ( "Uploading pipeline" )
name = "classic-example-training"
if client.get_pipeline_id(name) is not None :
# Upload new version if pipeline exists
pipeline_prev_version = client.get_pipeline(client.get_pipeline_id(name))
version_name = f " { name } - { uuid.uuid4() } "
pipeline = client.upload_pipeline_version(
pipeline_package_path = compile_pipeline(),
pipeline_version_name = version_name,
pipeline_id = pipeline_prev_version.pipeline_id,
)
else :
# Create new pipeline
pipeline = client.upload_pipeline(
pipeline_package_path = compile_pipeline(),
pipeline_name = name
)
print ( f "pipeline { pipeline.pipeline_id } " )
Inference Pipeline
The inference pipeline loads a trained model and runs predictions.
Pipeline Components
Load Model Component
Run Inference Component
@dsl.component ( base_image = IMAGE )
def load_model (
config : Output[Artifact],
model : Output[Model],
tokenizer : Output[Artifact],
tokenizer_config : Output[Artifact],
model_card : Output[Artifact],
special_tokens_map : Output[Artifact],
):
import shutil
from pathlib import Path
from classic_example.utils import load_from_registry
model_path = Path( "/tmp/model" )
model_path.mkdir( exist_ok = True )
# Download from W&B registry
load_from_registry( model_name = "kfp-pipeline:latest" , model_path = model_path)
# Export as KFP artifacts
shutil.move(model_path / "config.json" , config.path)
shutil.move(model_path / "model.safetensors" , model.path)
shutil.move(model_path / "tokenizer.json" , tokenizer.path)
shutil.move(model_path / "tokenizer_config.json" , tokenizer_config.path)
shutil.move(model_path / "special_tokens_map.json" , model_card.path)
shutil.move(model_path / "README.md" , special_tokens_map.path)
@dsl.component ( base_image = IMAGE )
def run_inference (
config : Input[Artifact],
model : Input[Model],
tokenizer : Input[Artifact],
tokenizer_config : Input[Artifact],
model_card : Input[Artifact],
special_tokens_map : Input[Artifact],
test_data : Input[Dataset],
pred : Output[Dataset],
):
import shutil
from pathlib import Path
from classic_example.predictor import run_inference_on_dataframe
# Reconstruct model from artifacts
model_path = Path( "/tmp/model" )
model_path.mkdir( exist_ok = True )
shutil.copy(config.path, model_path / "config.json" )
shutil.copy(model.path, model_path / "model.safetensors" )
shutil.copy(tokenizer.path, model_path / "tokenizer.json" )
shutil.copy(tokenizer_config.path, model_path / "tokenizer_config.json" )
shutil.copy(special_tokens_map.path, model_path / "special_tokens_map.json" )
shutil.copy(model_card.path, model_path / "README.md" )
# Run inference
run_inference_on_dataframe(
df_path = test_data.path,
model_load_path = model_path,
result_path = pred.path
)
Pipeline Definition
kubeflow_pipelines/kfp_inference_pipeline.py
@dsl.pipeline
def inference_pipeline ():
# Load test data
load_data_task = load_data()
# Load trained model from registry
load_model_task = load_model()
load_model_task = load_model_task.set_env_variable(
name = "WANDB_PROJECT" , value = WANDB_PROJECT
)
load_model_task = load_model_task.set_env_variable(
name = "WANDB_API_KEY" , value = WANDB_API_KEY
)
# Run inference (depends on both data and model)
run_inference(
config = load_model_task.outputs[ "config" ],
model = load_model_task.outputs[ "model" ],
tokenizer = load_model_task.outputs[ "tokenizer" ],
tokenizer_config = load_model_task.outputs[ "tokenizer_config" ],
model_card = load_model_task.outputs[ "model_card" ],
special_tokens_map = load_model_task.outputs[ "special_tokens_map" ],
test_data = load_data_task.outputs[ "test_data" ],
)
Running Pipelines
Deploy Training Pipeline
python ./kubeflow_pipelines/kfp_training_pipeline.py http://0.0.0.0:3000
This compiles and uploads the training pipeline to Kubeflow.
Deploy Inference Pipeline
python ./kubeflow_pipelines/kfp_inference_pipeline.py http://0.0.0.0:3000
Trigger Runs via UI
Navigate to http://0.0.0.0:3000
Go to Pipelines → Select your pipeline
Click Create run
Configure run parameters (if any)
Click Start
Monitor Execution
View Graph for component dependencies
Click components to see logs and artifacts
Check Input/Output tab for artifact lineage
Artifact Management
Kubeflow Pipelines v2 supports typed artifacts: Type Description Use Case DatasetTabular or structured data CSVs, DataFrames ModelML model artifacts Trained models ArtifactGeneric files Configs, logs, metadata MetricsEvaluation metrics Accuracy, loss
from kfp.dsl import Dataset, Model, Artifact, Input, Output
@dsl.component
def my_component (
input_data : Input[Dataset],
output_model : Output[Model],
config : Output[Artifact]
):
# Access via .path attribute
df = pd.read_csv(input_data.path)
# Save to output path
model.save(output_model.path)
Kubeflow automatically tracks:
Which component produced each artifact
Which components consumed the artifact
Artifact versions across pipeline runs
Storage location in MinIO
View lineage in the UI under Artifacts tab.
Download artifacts from MinIO: # Install MinIO client
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
# Configure
./mc alias set kubeflow http://localhost:9000 minio minio123
# List artifacts
./mc ls kubeflow/mlpipeline/
# Download artifact
./mc cp kubeflow/mlpipeline/artifacts/... ./local-path/
Best Practices
Component Design
Keep components focused and single-purpose
Use typed inputs/outputs for clarity
Document component parameters
Make components reusable across pipelines
Pipeline Versioning
Upload new versions instead of overwriting
Tag pipeline versions semantically
Test pipelines in separate experiments
Document breaking changes between versions
Resource Management
Set resource limits on components
Use node selectors for GPU workloads
Enable autoscaling for variable workloads
Monitor MinIO storage usage
Artifact Storage
Use appropriate artifact types
Compress large artifacts (models, datasets)
Clean up old experiments periodically
Back up MinIO for production
Troubleshooting
If pipeline upload fails: # Check KFP API server
kubectl get pods -n kubeflow | grep ml-pipeline
# Check logs
kubectl logs -n kubeflow deployment/ml-pipeline
# Verify port-forward is active
curl http://localhost:3000/apis/v2beta1/pipelines
Component Execution Errors
Debug component failures:
Click failed component in UI
View Logs tab for error messages
Check Input/Output for artifact issues
Verify base image has required dependencies
Test component locally:
# Run component as standalone function
load_data_task = load_data()
If artifacts aren’t passed between components:
Verify component output names match pipeline inputs
Check MinIO is running: kubectl get pods -n kubeflow | grep minio
Ensure components write to .path attribute
Verify network policies allow pod communication
Additional Resources
Next Steps
Explore Dagster Learn asset-centric orchestration with built-in data quality checks