Documentation Index Fetch the complete documentation index at: https://mintlify.com/kyryl-opens-ml/ml-in-production-practice/llms.txt
Use this file to discover all available pages before exploring further.
Dagster Pipelines
Dagster is a data orchestration platform that focuses on assets rather than tasks. It treats data, models, and metrics as first-class citizens with built-in lineage tracking, data quality checks, and observability.
Why Dagster?
Dagster’s asset-centric approach offers unique advantages:
Asset-first paradigm : Define what you want to produce, not just tasks
Data quality checks : Built-in asset checks for validation and anomaly detection
Rich metadata : Attach JSON, markdown, plots to assets for observability
Software-defined assets : Assets are code, enabling testing and version control
Flexible execution : Run locally, on Modal, Kubernetes, or other executors
Setup
Configure Environment
Set up Dagster home directory and credentials: mkdir ./dagster_pipelines/dagster-home
export DAGSTER_HOME = $PWD / dagster_pipelines / dagster-home
export WANDB_PROJECT = your-project
export WANDB_API_KEY = your-key
Install Dependencies
Install Dagster and required packages: pip install dagster dagster-webserver
pip install datasets transformers peft trl evaluate
pip install modal # For remote execution
Deploy Modal Functions (Optional)
Deploy training functions to Modal for GPU execution: MODAL_FORCE_BUILD = 1 modal deploy ./dagster_pipelines/text2sql_functions.py
Start Dagster UI
Launch the Dagster web interface: dagster dev -f dagster_pipelines/text2sql_pipeline.py -p 3000 -h 0.0.0.0
Access at http://0.0.0.0:3000
Text-to-SQL Pipeline
This example demonstrates fine-tuning a Phi-3 model for text-to-SQL generation with comprehensive data quality checks.
Asset: Load SQL Data
dagster_pipelines/text2sql_pipeline.py
from dagster import asset, AssetExecutionContext, MetadataValue
from datasets import DatasetDict, load_dataset
from random import randint
@asset ( group_name = "data" , compute_kind = "python" )
def load_sql_data ( context : AssetExecutionContext):
"""Load and subsample the SQL context dataset."""
dataset_name = "b-mc2/sql-create-context"
dataset = load_dataset(dataset_name, split = "train" )
# Subsample to 10% for faster training
subsample = 0.1
dataset = dataset.shuffle( seed = 42 ).select(
range ( int ( len (dataset) * subsample))
)
# Split into train/test
datasets = dataset.train_test_split( test_size = 0.05 , seed = 42 )
# Log metadata to Dagster UI
context.add_output_metadata(
{
"len_train" : MetadataValue.int( len (datasets[ "train" ])),
"len_test" : MetadataValue.int( len (datasets[ "test" ])),
"sample_train" : MetadataValue.json(
datasets[ "train" ][randint( 0 , len (datasets[ "train" ]))]
),
"sample_test" : MetadataValue.json(
datasets[ "test" ][randint( 0 , len (datasets[ "test" ]))]
),
}
)
return datasets
Key Features:
@asset decorator defines a software-defined asset
context.add_output_metadata() attaches rich metadata visible in UI
Returns DatasetDict passed to downstream assets
Asset Check: No Empty Datasets
from dagster import asset_check, AssetCheckResult
@asset_check ( asset = load_sql_data)
def no_empty ( load_sql_data ):
"""Check that train and test datasets are not empty."""
train_no_empty = len (load_sql_data[ "train" ]) != 0
test_no_empty = len (load_sql_data[ "test" ]) != 0
return AssetCheckResult( passed = train_no_empty and test_no_empty)
Asset checks run automatically when materializing assets:
passed=True/False indicates check result
Failures don’t stop execution but are highlighted in UI
Useful for data quality, schema validation, anomaly detection
Asset: Process Dataset
from functools import partial
from transformers import AutoTokenizer
def create_message_column ( row ):
"""Format SQL data as chat messages."""
messages = [
{ "content" : f " { row[ 'context' ] } \n Input: { row[ 'question' ] } " , "role" : "user" },
{ "content" : f " { row[ 'answer' ] } " , "role" : "assistant" },
]
return { "messages" : messages}
def format_dataset_chatml ( row , tokenizer ):
"""Apply chat template to messages."""
return {
"text" : tokenizer.apply_chat_template(
row[ "messages" ], add_generation_prompt = False , tokenize = False
)
}
@asset ( group_name = "data" , compute_kind = "python" )
def process_dataset ( context : AssetExecutionContext, load_sql_data ) -> DatasetDict:
"""Preprocess dataset with chat templates."""
model_id = "microsoft/Phi-3-mini-4k-instruct"
dataset = load_sql_data
tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.padding_side = "right"
# Apply transformations
dataset_chatml = dataset.map(create_message_column)
dataset_chatml = dataset_chatml.map(
partial(format_dataset_chatml, tokenizer = tokenizer)
)
context.add_output_metadata(
{
"len_train" : MetadataValue.int( len (dataset_chatml[ "train" ])),
"len_test" : MetadataValue.int( len (dataset_chatml[ "test" ])),
"sample_train" : MetadataValue.json(
dataset_chatml[ "train" ][randint( 0 , len (dataset_chatml[ "train" ]))]
),
}
)
return dataset_chatml
Asset: Trained Model
import modal
@asset ( group_name = "model" , compute_kind = "modal" )
def trained_model ( process_dataset ):
"""Train Phi-3 model on Modal with GPU."""
# Convert to pandas for serialization to Modal
process_dataset_pandas = {
"train" : process_dataset[ "train" ].to_pandas(),
"test" : process_dataset[ "test" ].to_pandas(),
}
# Call remote Modal function
model_training_job = modal.Function.lookup(
"ml-in-production-practice-dagster-pipeline" , "training_job"
)
model_name, uri = model_training_job.remote(
dataset_chatml_pandas = process_dataset_pandas
)
return model_name
Remote Execution with Modal:
compute_kind="modal" indicates execution environment
modal.Function.lookup() calls deployed Modal function
Training runs on Modal’s GPU infrastructure
Returns model name for downstream inference
Modal Training Function
dagster_pipelines/text2sql_functions.py
import os
import modal
from modal import Image
app = modal.App( "ml-in-production-practice-dagster-pipeline" )
env = {
"WANDB_PROJECT" : os.getenv( "WANDB_PROJECT" ),
"WANDB_API_KEY" : os.getenv( "WANDB_API_KEY" ),
}
custom_image = Image.from_registry(
"ghcr.io/kyryl-opens-ml/dagster-pipeline:main"
).env(env)
timeout = 10 * 60 * 60 # 10 hours
@app.function ( image = custom_image, gpu = "a10g" , timeout = timeout)
def training_job ( dataset_chatml_pandas ):
"""Train model on Modal GPU."""
from datasets import Dataset
from text2sql_pipeline import train_model
# Convert back to HuggingFace Dataset
dataset_chatml = {
"train" : Dataset.from_pandas(dataset_chatml_pandas[ "train" ]),
"test" : Dataset.from_pandas(dataset_chatml_pandas[ "test" ]),
}
model_name, uri = train_model( dataset_chatml = dataset_chatml)
return model_name, uri
@app.function ( image = custom_image, gpu = "a10g" , timeout = timeout)
def evaluation_job ( df , model_name ):
"""Evaluate model on Modal GPU."""
from text2sql_pipeline import evaluate_model
metrics = evaluate_model( df = df, model_name = model_name)
return metrics
Asset: Model Metrics
import modal
@asset ( group_name = "model" , compute_kind = "modal" )
def model_metrics ( context : AssetExecutionContext, trained_model , process_dataset ):
"""Evaluate trained model on test set."""
# Call remote evaluation job
model_evaluate_job = modal.Function.lookup(
"ml-in-production-practice-dagster-pipeline" , "evaluation_job"
)
metrics = model_evaluate_job.remote(
df = process_dataset[ "test" ].to_pandas(),
model_name = trained_model
)
context.add_output_metadata(
{
"results" : MetadataValue.json(metrics),
}
)
return metrics
Asset Checks: ROUGE Thresholds
@asset_check ( asset = model_metrics)
def rouge1_check ( model_metrics ):
"""Check ROUGE-1 score exceeds threshold."""
return AssetCheckResult( passed = bool (model_metrics[ "rouge1" ] > 0.8 ))
@asset_check ( asset = model_metrics)
def rouge2_check ( model_metrics ):
"""Check ROUGE-2 score exceeds threshold."""
return AssetCheckResult( passed = bool (model_metrics[ "rouge2" ] > 0.8 ))
@asset_check ( asset = model_metrics)
def rougeL_check ( model_metrics ):
"""Check ROUGE-L score exceeds threshold."""
return AssetCheckResult( passed = bool (model_metrics[ "rougeL" ] > 0.8 ))
@asset_check ( asset = model_metrics)
def rougeLsum_check ( model_metrics ):
"""Check ROUGE-Lsum score exceeds threshold."""
return AssetCheckResult( passed = bool (model_metrics[ "rougeLsum" ] > 0.8 ))
Multiple checks validate model quality:
Each check evaluates a different ROUGE metric
Failed checks indicate model performance issues
Visible in UI as warnings without blocking execution
Pipeline Definition
from dagster import Definitions
defs = Definitions(
assets = [
load_sql_data,
process_dataset,
trained_model,
model_metrics,
],
asset_checks = [
no_empty,
rouge1_check,
rouge2_check,
rougeL_check,
rougeLsum_check,
],
)
Definitions object registers all assets and checks with Dagster.
Running the Pipeline
In the Dagster UI:
Navigate to Assets view
Select all assets (or click Materialize all )
Click Materialize selected
Monitor execution in Runs view
Via CLI: dagster asset materialize -m text2sql_pipeline
Materialize only certain assets: # Materialize data assets only
dagster asset materialize -m text2sql_pipeline --select load_sql_data process_dataset
# Materialize model assets (will materialize dependencies)
dagster asset materialize -m text2sql_pipeline --select trained_model
In the UI:
Go to Asset Lineage view
See dependency graph:
load_sql_data → process_dataset → trained_model → model_metrics
Click assets to view:
Materialization history
Metadata (samples, metrics)
Asset checks
Lineage upstream/downstream
Asset Checks vs Assertions
Asset Checks (recommended):
Non-blocking: execution continues even if checks fail
Visible in UI with status indicators
Can be run independently of materialization
Support rich failure messages
Assertions (alternative):
Blocking: raise exceptions to halt execution
Useful for critical validations
Example:
@asset
def my_asset ( upstream ):
assert len (upstream) > 0 , "Empty dataset!"
return process(upstream)
Organize assets with group_name: @asset ( group_name = "data" )
def load_data (): ...
@asset ( group_name = "model" )
def train_model (): ...
Groups appear in UI for better organization:
data : Data loading and preprocessing
model : Training and evaluation
inference : Prediction and serving
Best Practices
Asset Design
Define assets by what they produce, not how
Keep assets pure functions when possible
Use descriptive names (e.g., cleaned_data, not step_2)
Group related assets logically
Data Quality
Add asset checks for critical validations
Use checks for schema validation, null checks, ranges
Set meaningful thresholds (don’t over-check)
Document why checks exist in docstrings
Metadata
Attach metadata for observability
Log samples, counts, metrics, distributions
Use markdown for human-readable summaries
Include links to external systems (W&B, MLflow)
Resource Management
Use compute_kind to indicate execution environment
Offload heavy compute to Modal, Kubernetes, etc.
Partition large assets for incremental processing
Configure retries for flaky dependencies
Comparison with Other Orchestrators
Dagster Advantages:
Asset-centric: focuses on data products, not tasks
Built-in data quality checks
Rich metadata and observability
Better local development experience
Airflow Advantages:
More mature ecosystem
Broader community and integrations
Battle-tested at scale
More flexible scheduling options
Dagster Advantages:
Not Kubernetes-specific
Simpler local testing
Asset checks for data quality
Better for data pipelines beyond ML
Kubeflow Advantages:
Native Kubernetes integration
ML-focused features (hyperparameter tuning, etc.)
Strong artifact lineage in KFP v2
Integrates with Kubeflow ecosystem
Troubleshooting
Modal Functions Not Found
If modal.Function.lookup() fails: # List deployed Modal apps
modal app list
# Check functions in app
modal app show ml-in-production-practice-dagster-pipeline
# Redeploy if needed
MODAL_FORCE_BUILD = 1 modal deploy ./dagster_pipelines/text2sql_functions.py
Asset Materialization Fails
Debug failed materializations:
Click failed asset in Runs view
Expand Logs to see error traceback
Check Compute Logs for stdout/stderr
Verify upstream assets materialized successfully
Test asset function locally:
from text2sql_pipeline import load_sql_data
from dagster import build_asset_context
result = load_sql_data(build_asset_context())
print (result)
If checks consistently fail:
Review threshold values (e.g., ROUGE > 0.8 may be too strict)
Check if asset returns expected format
Add logging in check functions to debug
Consider making checks warnings instead of failures
Additional Resources
Next Steps
Practice Exercises Complete hands-on exercises to build your own orchestration pipelines