Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/zenml-io/zenml/llms.txt

Use this file to discover all available pages before exploring further.

Orchestrating AI agents requires different patterns than traditional ML pipelines. This guide covers the essential techniques for building robust, reproducible agent workflows.

Agent pipeline anatomy

A typical agent pipeline consists of three core components:
from zenml import pipeline, step
from typing import Annotated, Dict, Any

@step
def run_agent(query: str) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute the agent and capture results."""
    agent = initialize_agent()
    result = agent.process(query)
    
    return {
        "response": result.text,
        "confidence": result.confidence,
        "latency_ms": result.latency_ms,
        "tokens_used": result.tokens_used,
    }

@step
def format_response(
    results: Dict[str, Any]
) -> Annotated[str, "formatted_response"]:
    """Format agent output for consumption."""
    return f"""Response: {results['response']}
    
Confidence: {results['confidence']:.2f}
Latency: {results['latency_ms']:.1f}ms
Tokens: {results['tokens_used']}"""

@pipeline
def agent_pipeline(query: str = "Default query") -> str:
    """Main agent orchestration pipeline."""
    results = run_agent(query)
    formatted = format_response(results)
    return formatted

Key principles

  1. Annotated outputs: Use Annotated[Type, "artifact_name"] to track all outputs as versioned artifacts
  2. Structured results: Return dictionaries or Pydantic models, not just strings
  3. Error handling: Wrap agent calls in try-except blocks with status tracking
  4. Metadata capture: Log execution details (latency, tokens, costs) for analysis

Deployment patterns

Local execution

Run pipelines locally for development and testing:
if __name__ == "__main__":
    # Run with default parameters
    agent_pipeline()
    
    # Run with custom query
    agent_pipeline(query="What's the weather in Berlin?")

HTTP deployment

Deploy as a production HTTP service:
from zenml.config import DeploymentSettings, CORSConfig

deployment_settings = DeploymentSettings(
    app_title="Customer Support Agent",
    cors=CORSConfig(allow_origins=["*"]),
    dashboard_files_path="ui",  # Serve web UI
)

@pipeline(
    settings={"deployment": deployment_settings},
    enable_cache=False
)
def agent_api(query: str) -> str:
    """Agent deployed as HTTP endpoint."""
    response = run_agent(query)
    return format_response(response)
Deploy and invoke:
# Deploy the pipeline
zenml pipeline deploy agent_api --name support-agent

# Get endpoint URL
zenml deployment describe support-agent

# Invoke via CLI
zenml deployment invoke support-agent --query="How do I return an item?"

# Invoke via HTTP
curl -X POST http://localhost:8000/invoke \
  -H "Content-Type: application/json" \
  -d '{"parameters": {"query": "How do I return an item?"}}'

Docker configuration

Package agents with their dependencies:
from zenml.config import DockerSettings

docker_settings = DockerSettings(
    requirements="requirements.txt",
    python_package_installer="uv",  # Fast installs
    environment={
        "OPENAI_API_KEY": "${OPENAI_API_KEY}",
        "LANGFUSE_PUBLIC_KEY": "${LANGFUSE_PUBLIC_KEY}",
        "LANGFUSE_SECRET_KEY": "${LANGFUSE_SECRET_KEY}",
    },
)

@pipeline(settings={"docker": docker_settings})
def agent_pipeline(query: str) -> str:
    response = run_agent(query)
    return format_response(response)

Multi-agent orchestration

Routing pattern

Route queries to specialized agents:
@step
def classify_intent(
    query: str
) -> Annotated[str, "intent"]:
    """Classify query intent for routing."""
    query_lower = query.lower()
    
    if any(word in query_lower for word in ["return", "refund"]):
        return "returns_specialist"
    elif any(word in query_lower for word in ["billing", "payment"]):
        return "billing_specialist"
    elif any(word in query_lower for word in ["technical", "setup"]):
        return "technical_support"
    return "general_support"

@step
def run_specialist(
    query: str, intent: str
) -> Annotated[Dict[str, Any], "specialist_response"]:
    """Execute appropriate specialist agent."""
    agents = {
        "returns_specialist": ReturnsAgent(),
        "billing_specialist": BillingAgent(),
        "technical_support": TechnicalAgent(),
        "general_support": GeneralAgent(),
    }
    
    agent = agents.get(intent, agents["general_support"])
    result = agent.process(query)
    
    return {
        "response": result.text,
        "specialist": intent,
        "confidence": result.confidence,
    }

@pipeline
def multi_agent_pipeline(query: str) -> str:
    """Multi-specialist agent system."""
    intent = classify_intent(query)
    results = run_specialist(query, intent)
    return format_response(results)

CrewAI integration

Orchestrate agent crews:
from crewai import Agent, Crew, Task
from crewai.tools import tool

@tool("Weather Checker Tool")
def get_weather(city: str) -> str:
    """Get weather for a given city."""
    return f"Current weather in {city}: Sunny, 22°C"

weather_checker = Agent(
    role="Weather Specialist",
    goal="Check weather conditions for {city}",
    backstory="Expert meteorologist",
    tools=[get_weather],
    verbose=True,
)

travel_advisor = Agent(
    role="Travel Advisor",
    goal="Give travel advice based on weather for {city}",
    backstory="Experienced travel consultant",
    verbose=True,
)

check_weather_task = Task(
    description="Check weather in {city}",
    expected_output="Weather report",
    agent=weather_checker,
)

packing_advice_task = Task(
    description="Provide packing advice for {city}",
    expected_output="List of items to pack",
    agent=travel_advisor,
    context=[check_weather_task],
)

crew = Crew(
    agents=[weather_checker, travel_advisor],
    tasks=[check_weather_task, packing_advice_task],
    verbose=True,
)

@step
def run_crew(city: str) -> Annotated[Dict[str, Any], "crew_results"]:
    """Execute CrewAI crew."""
    result = crew.kickoff(inputs={"city": city})
    return {"city": city, "result": str(result), "status": "success"}

@pipeline
def travel_planning_pipeline(city: str = "Berlin") -> str:
    """CrewAI travel planning."""
    crew_results = run_crew(city)
    formatted = format_travel_results(crew_results)
    return formatted

LangGraph workflows

Build stateful agent workflows:
from langgraph.graph import StateGraph, END, START
from langchain_core.messages import HumanMessage
from typing import TypedDict, List

class AgentState(TypedDict):
    """State for customer service workflow."""
    messages: List[BaseMessage]
    query_type: str
    confidence: float
    response_text: str

class CustomerServiceAgent:
    """LangGraph-based agent."""
    
    def __init__(self):
        self.graph = self._build_graph()
    
    def _build_graph(self):
        workflow = StateGraph(AgentState)
        
        # Add nodes
        workflow.add_node("analyze_query", self._analyze_query)
        workflow.add_node("classify_intent", self._classify_intent)
        workflow.add_node("generate_response", self._generate_response)
        workflow.add_node("validate_response", self._validate_response)
        
        # Add edges
        workflow.add_edge(START, "analyze_query")
        workflow.add_edge("analyze_query", "classify_intent")
        workflow.add_edge("classify_intent", "generate_response")
        workflow.add_edge("generate_response", "validate_response")
        workflow.add_edge("validate_response", END)
        
        return workflow.compile()
    
    def _analyze_query(self, state: AgentState) -> AgentState:
        """Analyze query complexity."""
        query = state["messages"][-1].content
        state["confidence"] = 0.9 if len(query.split()) < 10 else 0.8
        return state
    
    def _classify_intent(self, state: AgentState) -> AgentState:
        """Classify customer intent."""
        query = state["messages"][-1].content.lower()
        if "return" in query or "refund" in query:
            state["query_type"] = "returns"
        elif "billing" in query or "payment" in query:
            state["query_type"] = "billing"
        else:
            state["query_type"] = "general"
        return state
    
    def _generate_response(self, state: AgentState) -> AgentState:
        """Generate response based on intent."""
        query = state["messages"][-1].content
        query_type = state["query_type"]
        
        # Call LLM or use fallback logic
        response = generate_response_for_type(query, query_type)
        state["response_text"] = response
        return state
    
    def _validate_response(self, state: AgentState) -> AgentState:
        """Validate response quality."""
        if len(state["response_text"]) < 20:
            state["response_text"] = "Could you provide more details?"
            state["confidence"] = 0.6
        return state
    
    def process(self, query: str) -> Dict[str, Any]:
        """Process query through workflow."""
        initial_state = AgentState(
            messages=[HumanMessage(content=query)],
            query_type="",
            confidence=0.8,
            response_text="",
        )
        final_state = self.graph.invoke(initial_state)
        return {
            "response": final_state["response_text"],
            "query_type": final_state["query_type"],
            "confidence": final_state["confidence"],
        }

@step
def run_langgraph_agent(
    query: str
) -> Annotated[Dict[str, Any], "langgraph_results"]:
    """Execute LangGraph workflow."""
    agent = CustomerServiceAgent()
    return agent.process(query)

@pipeline
def langgraph_pipeline(query: str) -> str:
    """LangGraph agent pipeline."""
    results = run_langgraph_agent(query)
    return format_response(results)

Hybrid architectures

Combine traditional ML with LLM agents:
@step
def train_classifier(
    queries: List[str], labels: List[str]
) -> Annotated[Any, "intent_classifier"]:
    """Train intent classification model."""
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.pipeline import Pipeline
    
    classifier = Pipeline([
        ('tfidf', TfidfVectorizer(max_features=100)),
        ('clf', MultinomialNB()),
    ])
    classifier.fit(queries, labels)
    return classifier

@step
def hybrid_agent(
    query: str, classifier: Any
) -> Annotated[Dict[str, Any], "hybrid_response"]:
    """Use classifier + LLM for better results."""
    # Fast classification with traditional ML
    intent = classifier.predict([query])[0]
    confidence = classifier.predict_proba([query]).max()
    
    # Use LLM only for complex queries
    if confidence < 0.7:
        response = call_llm(query, intent)
    else:
        response = get_template_response(intent)
    
    return {
        "response": response,
        "intent": intent,
        "confidence": confidence,
        "used_llm": confidence < 0.7,
    }

@pipeline
def hybrid_pipeline() -> str:
    """Hybrid ML + LLM architecture."""
    # Train classifier once
    queries, labels = load_training_data()
    classifier = train_classifier(queries, labels)
    
    # Use for inference
    test_query = "I want to return my order"
    results = hybrid_agent(test_query, classifier)
    return format_response(results)

Error handling

Robust error handling for production:
@step
def run_agent_with_retry(
    query: str, max_retries: int = 3
) -> Annotated[Dict[str, Any], "agent_results"]:
    """Execute agent with retry logic."""
    import time
    
    for attempt in range(max_retries):
        try:
            agent = initialize_agent()
            result = agent.process(query)
            
            return {
                "response": result.text,
                "status": "success",
                "attempt": attempt + 1,
            }
        except Exception as e:
            if attempt == max_retries - 1:
                return {
                    "response": "I apologize, but I'm experiencing technical difficulties.",
                    "status": "error",
                    "error_message": str(e),
                    "attempt": attempt + 1,
                }
            time.sleep(2 ** attempt)  # Exponential backoff

Observability integration

Track agent performance with Langfuse:
from langfuse import Langfuse

langfuse = Langfuse(
    public_key=os.getenv("LANGFUSE_PUBLIC_KEY"),
    secret_key=os.getenv("LANGFUSE_SECRET_KEY"),
)

@step
def run_agent_with_tracing(
    query: str
) -> Annotated[Dict[str, Any], "agent_results"]:
    """Agent execution with Langfuse tracing."""
    trace = langfuse.trace(
        name="agent_execution",
        metadata={"query_length": len(query)},
    )
    
    with trace.span(name="agent_processing") as span:
        agent = initialize_agent()
        result = agent.process(query)
        
        span.update(
            metadata={
                "tokens_used": result.tokens_used,
                "latency_ms": result.latency_ms,
            }
        )
    
    trace.update(output=result.text)
    
    return {
        "response": result.text,
        "trace_id": trace.id,
        "tokens_used": result.tokens_used,
    }

Best practices

  1. Use artifacts: Store all agent outputs as versioned artifacts with Annotated
  2. Capture metadata: Log latency, tokens, costs, and confidence scores
  3. Handle errors gracefully: Return status fields and fallback responses
  4. Enable caching carefully: Set enable_cache=False for non-deterministic agents
  5. Structure outputs: Use Pydantic models or dicts, not raw strings
  6. Deploy with Docker: Package dependencies with DockerSettings
  7. Monitor production: Integrate observability tools like Langfuse
  8. Test systematically: Build evaluation pipelines to compare architectures

Next steps

Agent frameworks

Integration guides for 12+ agent frameworks

Agent evaluation

Build systematic evaluation pipelines

Deploying agents

Complete deployment example with web UI

Agent comparison

Compare multiple agent architectures

Build docs developers (and LLMs) love