Documentation Index Fetch the complete documentation index at: https://mintlify.com/zenml-io/zenml/llms.txt
Use this file to discover all available pages before exploring further.
Orchestrating AI agents requires different patterns than traditional ML pipelines. This guide covers the essential techniques for building robust, reproducible agent workflows.
Agent pipeline anatomy
A typical agent pipeline consists of three core components:
from zenml import pipeline, step
from typing import Annotated, Dict, Any
@step
def run_agent ( query : str ) -> Annotated[Dict[ str , Any], "agent_results" ]:
"""Execute the agent and capture results."""
agent = initialize_agent()
result = agent.process(query)
return {
"response" : result.text,
"confidence" : result.confidence,
"latency_ms" : result.latency_ms,
"tokens_used" : result.tokens_used,
}
@step
def format_response (
results : Dict[ str , Any]
) -> Annotated[ str , "formatted_response" ]:
"""Format agent output for consumption."""
return f """Response: { results[ 'response' ] }
Confidence: { results[ 'confidence' ] :.2f}
Latency: { results[ 'latency_ms' ] :.1f} ms
Tokens: { results[ 'tokens_used' ] } """
@pipeline
def agent_pipeline ( query : str = "Default query" ) -> str :
"""Main agent orchestration pipeline."""
results = run_agent(query)
formatted = format_response(results)
return formatted
Key principles
Annotated outputs : Use Annotated[Type, "artifact_name"] to track all outputs as versioned artifacts
Structured results : Return dictionaries or Pydantic models, not just strings
Error handling : Wrap agent calls in try-except blocks with status tracking
Metadata capture : Log execution details (latency, tokens, costs) for analysis
Deployment patterns
Local execution
Run pipelines locally for development and testing:
if __name__ == "__main__" :
# Run with default parameters
agent_pipeline()
# Run with custom query
agent_pipeline( query = "What's the weather in Berlin?" )
HTTP deployment
Deploy as a production HTTP service:
from zenml.config import DeploymentSettings, CORSConfig
deployment_settings = DeploymentSettings(
app_title = "Customer Support Agent" ,
cors = CORSConfig( allow_origins = [ "*" ]),
dashboard_files_path = "ui" , # Serve web UI
)
@pipeline (
settings = { "deployment" : deployment_settings},
enable_cache = False
)
def agent_api ( query : str ) -> str :
"""Agent deployed as HTTP endpoint."""
response = run_agent(query)
return format_response(response)
Deploy and invoke:
# Deploy the pipeline
zenml pipeline deploy agent_api --name support-agent
# Get endpoint URL
zenml deployment describe support-agent
# Invoke via CLI
zenml deployment invoke support-agent --query= "How do I return an item?"
# Invoke via HTTP
curl -X POST http://localhost:8000/invoke \
-H "Content-Type: application/json" \
-d '{"parameters": {"query": "How do I return an item?"}}'
Docker configuration
Package agents with their dependencies:
from zenml.config import DockerSettings
docker_settings = DockerSettings(
requirements = "requirements.txt" ,
python_package_installer = "uv" , # Fast installs
environment = {
"OPENAI_API_KEY" : "$ {OPENAI_API_KEY} " ,
"LANGFUSE_PUBLIC_KEY" : "$ {LANGFUSE_PUBLIC_KEY} " ,
"LANGFUSE_SECRET_KEY" : "$ {LANGFUSE_SECRET_KEY} " ,
},
)
@pipeline ( settings = { "docker" : docker_settings})
def agent_pipeline ( query : str ) -> str :
response = run_agent(query)
return format_response(response)
Multi-agent orchestration
Routing pattern
Route queries to specialized agents:
@step
def classify_intent (
query : str
) -> Annotated[ str , "intent" ]:
"""Classify query intent for routing."""
query_lower = query.lower()
if any (word in query_lower for word in [ "return" , "refund" ]):
return "returns_specialist"
elif any (word in query_lower for word in [ "billing" , "payment" ]):
return "billing_specialist"
elif any (word in query_lower for word in [ "technical" , "setup" ]):
return "technical_support"
return "general_support"
@step
def run_specialist (
query : str , intent : str
) -> Annotated[Dict[ str , Any], "specialist_response" ]:
"""Execute appropriate specialist agent."""
agents = {
"returns_specialist" : ReturnsAgent(),
"billing_specialist" : BillingAgent(),
"technical_support" : TechnicalAgent(),
"general_support" : GeneralAgent(),
}
agent = agents.get(intent, agents[ "general_support" ])
result = agent.process(query)
return {
"response" : result.text,
"specialist" : intent,
"confidence" : result.confidence,
}
@pipeline
def multi_agent_pipeline ( query : str ) -> str :
"""Multi-specialist agent system."""
intent = classify_intent(query)
results = run_specialist(query, intent)
return format_response(results)
CrewAI integration
Orchestrate agent crews:
from crewai import Agent, Crew, Task
from crewai.tools import tool
@tool ( "Weather Checker Tool" )
def get_weather ( city : str ) -> str :
"""Get weather for a given city."""
return f "Current weather in { city } : Sunny, 22°C"
weather_checker = Agent(
role = "Weather Specialist" ,
goal = "Check weather conditions for {city} " ,
backstory = "Expert meteorologist" ,
tools = [get_weather],
verbose = True ,
)
travel_advisor = Agent(
role = "Travel Advisor" ,
goal = "Give travel advice based on weather for {city} " ,
backstory = "Experienced travel consultant" ,
verbose = True ,
)
check_weather_task = Task(
description = "Check weather in {city} " ,
expected_output = "Weather report" ,
agent = weather_checker,
)
packing_advice_task = Task(
description = "Provide packing advice for {city} " ,
expected_output = "List of items to pack" ,
agent = travel_advisor,
context = [check_weather_task],
)
crew = Crew(
agents = [weather_checker, travel_advisor],
tasks = [check_weather_task, packing_advice_task],
verbose = True ,
)
@step
def run_crew ( city : str ) -> Annotated[Dict[ str , Any], "crew_results" ]:
"""Execute CrewAI crew."""
result = crew.kickoff( inputs = { "city" : city})
return { "city" : city, "result" : str (result), "status" : "success" }
@pipeline
def travel_planning_pipeline ( city : str = "Berlin" ) -> str :
"""CrewAI travel planning."""
crew_results = run_crew(city)
formatted = format_travel_results(crew_results)
return formatted
LangGraph workflows
Build stateful agent workflows:
from langgraph.graph import StateGraph, END , START
from langchain_core.messages import HumanMessage
from typing import TypedDict, List
class AgentState ( TypedDict ):
"""State for customer service workflow."""
messages: List[BaseMessage]
query_type: str
confidence: float
response_text: str
class CustomerServiceAgent :
"""LangGraph-based agent."""
def __init__ ( self ):
self .graph = self ._build_graph()
def _build_graph ( self ):
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node( "analyze_query" , self ._analyze_query)
workflow.add_node( "classify_intent" , self ._classify_intent)
workflow.add_node( "generate_response" , self ._generate_response)
workflow.add_node( "validate_response" , self ._validate_response)
# Add edges
workflow.add_edge( START , "analyze_query" )
workflow.add_edge( "analyze_query" , "classify_intent" )
workflow.add_edge( "classify_intent" , "generate_response" )
workflow.add_edge( "generate_response" , "validate_response" )
workflow.add_edge( "validate_response" , END )
return workflow.compile()
def _analyze_query ( self , state : AgentState) -> AgentState:
"""Analyze query complexity."""
query = state[ "messages" ][ - 1 ].content
state[ "confidence" ] = 0.9 if len (query.split()) < 10 else 0.8
return state
def _classify_intent ( self , state : AgentState) -> AgentState:
"""Classify customer intent."""
query = state[ "messages" ][ - 1 ].content.lower()
if "return" in query or "refund" in query:
state[ "query_type" ] = "returns"
elif "billing" in query or "payment" in query:
state[ "query_type" ] = "billing"
else :
state[ "query_type" ] = "general"
return state
def _generate_response ( self , state : AgentState) -> AgentState:
"""Generate response based on intent."""
query = state[ "messages" ][ - 1 ].content
query_type = state[ "query_type" ]
# Call LLM or use fallback logic
response = generate_response_for_type(query, query_type)
state[ "response_text" ] = response
return state
def _validate_response ( self , state : AgentState) -> AgentState:
"""Validate response quality."""
if len (state[ "response_text" ]) < 20 :
state[ "response_text" ] = "Could you provide more details?"
state[ "confidence" ] = 0.6
return state
def process ( self , query : str ) -> Dict[ str , Any]:
"""Process query through workflow."""
initial_state = AgentState(
messages = [HumanMessage( content = query)],
query_type = "" ,
confidence = 0.8 ,
response_text = "" ,
)
final_state = self .graph.invoke(initial_state)
return {
"response" : final_state[ "response_text" ],
"query_type" : final_state[ "query_type" ],
"confidence" : final_state[ "confidence" ],
}
@step
def run_langgraph_agent (
query : str
) -> Annotated[Dict[ str , Any], "langgraph_results" ]:
"""Execute LangGraph workflow."""
agent = CustomerServiceAgent()
return agent.process(query)
@pipeline
def langgraph_pipeline ( query : str ) -> str :
"""LangGraph agent pipeline."""
results = run_langgraph_agent(query)
return format_response(results)
Hybrid architectures
Combine traditional ML with LLM agents:
@step
def train_classifier (
queries : List[ str ], labels : List[ str ]
) -> Annotated[Any, "intent_classifier" ]:
"""Train intent classification model."""
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
classifier = Pipeline([
( 'tfidf' , TfidfVectorizer( max_features = 100 )),
( 'clf' , MultinomialNB()),
])
classifier.fit(queries, labels)
return classifier
@step
def hybrid_agent (
query : str , classifier : Any
) -> Annotated[Dict[ str , Any], "hybrid_response" ]:
"""Use classifier + LLM for better results."""
# Fast classification with traditional ML
intent = classifier.predict([query])[ 0 ]
confidence = classifier.predict_proba([query]).max()
# Use LLM only for complex queries
if confidence < 0.7 :
response = call_llm(query, intent)
else :
response = get_template_response(intent)
return {
"response" : response,
"intent" : intent,
"confidence" : confidence,
"used_llm" : confidence < 0.7 ,
}
@pipeline
def hybrid_pipeline () -> str :
"""Hybrid ML + LLM architecture."""
# Train classifier once
queries, labels = load_training_data()
classifier = train_classifier(queries, labels)
# Use for inference
test_query = "I want to return my order"
results = hybrid_agent(test_query, classifier)
return format_response(results)
Error handling
Robust error handling for production:
@step
def run_agent_with_retry (
query : str , max_retries : int = 3
) -> Annotated[Dict[ str , Any], "agent_results" ]:
"""Execute agent with retry logic."""
import time
for attempt in range (max_retries):
try :
agent = initialize_agent()
result = agent.process(query)
return {
"response" : result.text,
"status" : "success" ,
"attempt" : attempt + 1 ,
}
except Exception as e:
if attempt == max_retries - 1 :
return {
"response" : "I apologize, but I'm experiencing technical difficulties." ,
"status" : "error" ,
"error_message" : str (e),
"attempt" : attempt + 1 ,
}
time.sleep( 2 ** attempt) # Exponential backoff
Observability integration
Track agent performance with Langfuse:
from langfuse import Langfuse
langfuse = Langfuse(
public_key = os.getenv( "LANGFUSE_PUBLIC_KEY" ),
secret_key = os.getenv( "LANGFUSE_SECRET_KEY" ),
)
@step
def run_agent_with_tracing (
query : str
) -> Annotated[Dict[ str , Any], "agent_results" ]:
"""Agent execution with Langfuse tracing."""
trace = langfuse.trace(
name = "agent_execution" ,
metadata = { "query_length" : len (query)},
)
with trace.span( name = "agent_processing" ) as span:
agent = initialize_agent()
result = agent.process(query)
span.update(
metadata = {
"tokens_used" : result.tokens_used,
"latency_ms" : result.latency_ms,
}
)
trace.update( output = result.text)
return {
"response" : result.text,
"trace_id" : trace.id,
"tokens_used" : result.tokens_used,
}
Best practices
Use artifacts : Store all agent outputs as versioned artifacts with Annotated
Capture metadata : Log latency, tokens, costs, and confidence scores
Handle errors gracefully : Return status fields and fallback responses
Enable caching carefully : Set enable_cache=False for non-deterministic agents
Structure outputs : Use Pydantic models or dicts, not raw strings
Deploy with Docker : Package dependencies with DockerSettings
Monitor production : Integrate observability tools like Langfuse
Test systematically : Build evaluation pipelines to compare architectures
Next steps
Agent frameworks Integration guides for 12+ agent frameworks
Agent evaluation Build systematic evaluation pipelines
Deploying agents Complete deployment example with web UI
Agent comparison Compare multiple agent architectures