Documentation Index Fetch the complete documentation index at: https://mintlify.com/zenml-io/zenml/llms.txt
Use this file to discover all available pages before exploring further.
This collection demonstrates ZenML integration patterns for 12 popular agent frameworks. Each example follows consistent patterns while showcasing framework-specific features.
Overview
All examples are available at:
https://github.com/zenml-io/zenml/tree/main/examples/agent_framework_integrations
Quick start
Run any framework example:
cd agent_framework_integrations/ < framework-nam e > /
# Create virtual environment
uv venv --python 3.11
source .venv/bin/activate
# Install dependencies
uv pip install -r requirements.txt
# Set API key
export OPENAI_API_KEY = sk-xxx
# Run the example
python run.py
Framework examples
LangGraph
Stateful graph-based workflows
# langgraph/langgraph_agent.py
from langchain.agents import create_agent
def get_weather ( city : str ) -> str :
"""Get weather for a given city."""
return f "It's always sunny in { city } !"
agent = create_agent(
model = "openai:gpt-4" ,
tools = [get_weather],
system_prompt = "You are a helpful assistant" ,
)
# langgraph/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from langgraph_agent import agent
@step
def run_langgraph_agent (
query : str
) -> Annotated[Dict[ str , Any], "agent_results" ]:
"""Execute LangGraph agent."""
result = agent.invoke(
{ "messages" : [{ "role" : "user" , "content" : query}]}
)
return {
"response" : result[ "messages" ][ - 1 ].content,
"status" : "success" ,
}
@step
def format_response (
results : Dict[ str , Any]
) -> Annotated[ str , "formatted_response" ]:
"""Format agent output."""
return f "Response: { results[ 'response' ] } "
@pipeline
def agent_pipeline ( query : str = "What's the weather in SF?" ) -> str :
"""LangGraph agent pipeline."""
results = run_langgraph_agent(query)
formatted = format_response(results)
return formatted
if __name__ == "__main__" :
print ( "Running LangGraph agent pipeline..." )
agent_pipeline()
Key features:
Graph-based workflow definition
State management across nodes
ReAct pattern support
Complex multi-step reasoning
Requirements:
langgraph>=0.2.0
langchain-core>=0.3.0
langchain-openai>=0.2.0
zenml>=0.75.0
CrewAI
Multi-agent crews with role-based collaboration
# crewai/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from crewai import Agent, Crew, Task
from crewai.tools import tool
@tool ( "Weather Checker Tool" )
def get_weather ( city : str ) -> str :
"""Get weather for a given city."""
return f "Current weather in { city } : Sunny, 22°C"
# Define agents
weather_checker = Agent(
role = "Weather Specialist" ,
goal = "Check weather conditions for {city} " ,
backstory = "Expert meteorologist" ,
tools = [get_weather],
verbose = True ,
)
travel_advisor = Agent(
role = "Travel Advisor" ,
goal = "Give travel advice based on weather for {city} " ,
backstory = "Experienced travel consultant" ,
verbose = True ,
)
# Define tasks
check_weather_task = Task(
description = "Check weather in {city} " ,
expected_output = "Weather report" ,
agent = weather_checker,
)
packing_advice_task = Task(
description = "Provide packing advice for {city} " ,
expected_output = "List of items to pack" ,
agent = travel_advisor,
context = [check_weather_task],
)
# Create crew
crew = Crew(
agents = [weather_checker, travel_advisor],
tasks = [check_weather_task, packing_advice_task],
verbose = True ,
)
@step
def run_crewai_crew (
city : str
) -> Annotated[Dict[ str , Any], "crew_results" ]:
"""Execute CrewAI crew."""
result = crew.kickoff( inputs = { "city" : city})
return { "city" : city, "result" : str (result), "status" : "success" }
@step
def format_travel_results (
crew_data : Dict[ str , Any]
) -> Annotated[ str , "formatted_results" ]:
"""Format crew results."""
return f """Travel Planning for { crew_data[ 'city' ].upper() }
{ '=' * 50 }
{ crew_data[ 'result' ] }
"""
@pipeline
def travel_planning_pipeline ( city : str = "Berlin" ) -> str :
"""CrewAI travel planning pipeline."""
crew_results = run_crewai_crew(city)
formatted = format_travel_results(crew_results)
return formatted
if __name__ == "__main__" :
print ( "Running CrewAI travel planning pipeline..." )
travel_planning_pipeline()
Key features:
Role-based agent collaboration
Task dependencies and context sharing
Built-in tool decorators
Sequential task execution
Requirements:
crewai>=0.80.0
crewai-tools>=0.12.0
zenml>=0.75.0
LangChain
Composable chains and runnables
# langchain/langchain_agent.py
from langchain_community.document_loaders import WebBaseLoader
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
def load_docs ( input_dict : dict ) -> str :
"""Load docs from URL."""
url = input_dict.get( "url" )
if not url:
return "No URL provided."
try :
loader = WebBaseLoader(url)
docs = loader.load()
return " \n " .join([doc.page_content for doc in docs])
except Exception as e:
return f "Error loading URL: { str (e) } "
# Define LLM and prompt
llm = ChatOpenAI( model = "gpt-4o-mini" )
prompt = ChatPromptTemplate.from_template(
"Write a concise summary of the following text: \n\n {text} "
)
# Create runnable chain
chain = (
RunnableLambda(load_docs)
| RunnableLambda( lambda text : { "text" : text})
| prompt
| llm
| StrOutputParser()
)
# langchain/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from langchain_agent import chain
@step
def run_langchain_chain (
url : str
) -> Annotated[Dict[ str , Any], "chain_results" ]:
"""Execute LangChain chain."""
try :
result = chain.invoke({ "url" : url})
return { "summary" : result, "status" : "success" }
except Exception as e:
return { "summary" : "" , "status" : "error" , "error" : str (e)}
@pipeline
def summarization_pipeline (
url : str = "https://docs.zenml.io"
) -> str :
"""LangChain document summarization pipeline."""
results = run_langchain_chain(url)
return results[ "summary" ]
if __name__ == "__main__" :
print ( "Running LangChain summarization pipeline..." )
summarization_pipeline()
Key features:
Composable runnable chains
Document loaders and processors
Output parsers for structured data
Tool integration via decorators
Requirements:
langchain>=0.3.0
langchain-openai>=0.2.0
langchain-community>=0.3.0
zenml>=0.75.0
LlamaIndex
Function-based agents with async support
# llama_index/agent.py
from llama_index.core.agent import ReActAgent
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
def get_weather ( city : str ) -> str :
"""Get weather for a given city."""
return f "It's always sunny in { city } !"
# Create tool from function
weather_tool = FunctionTool.from_defaults( fn = get_weather)
# Create agent
llm = OpenAI( model = "gpt-4o-mini" )
agent = ReActAgent( tools = [weather_tool], llm = llm, verbose = True )
# llama_index/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from agent import agent
import asyncio
@step
def run_llamaindex_agent (
query : str
) -> Annotated[Dict[ str , Any], "agent_results" ]:
"""Execute LlamaIndex agent."""
# LlamaIndex agents are async
response = asyncio.run(agent.aquery(query))
return {
"response" : str (response),
"status" : "success" ,
}
@pipeline
def agent_pipeline ( query : str = "What's the weather in Tokyo?" ) -> str :
"""LlamaIndex agent pipeline."""
results = run_llamaindex_agent(query)
return results[ "response" ]
if __name__ == "__main__" :
print ( "Running LlamaIndex agent pipeline..." )
agent_pipeline()
Key features:
ReAct agent pattern
Async execution support
Function-based tools
RAG capabilities
Requirements:
llama-index>=0.11.0
llama-index-llms-openai>=0.2.0
zenml>=0.75.0
PydanticAI
Type-safe agents with structured outputs
# pydanticai/pydanticai_agent.py
from pydantic_ai import Agent
from pydantic import BaseModel
class WeatherResponse ( BaseModel ):
city: str
temperature: float
conditions: str
agent = Agent(
"openai:gpt-4" ,
system_prompt = "Provide weather information" ,
result_type = WeatherResponse,
)
@agent.tool
def get_current_weather ( city : str ) -> dict :
"""Get current weather."""
return { "city" : city, "temp" : 22 , "conditions" : "Sunny" }
# pydanticai/run.py
from zenml import pipeline, step
from typing import Annotated, Dict, Any
from pydanticai_agent import agent
@step
def run_pydanticai_agent (
query : str
) -> Annotated[Dict[ str , Any], "agent_results" ]:
"""Execute PydanticAI agent."""
result = agent.run_sync(query)
return {
"response" : result.data.model_dump(),
"status" : "success" ,
}
@pipeline
def agent_pipeline (
query : str = "What's the weather in Paris?"
) -> Dict[ str , Any]:
"""PydanticAI agent pipeline."""
results = run_pydanticai_agent(query)
return results
if __name__ == "__main__" :
print ( "Running PydanticAI agent pipeline..." )
result = agent_pipeline()
print ( f "Result: { result } " )
Key features:
Type-safe with Pydantic models
Structured output validation
Built-in tool decorators
Sync and async support
Requirements:
pydantic-ai>=0.0.14
pydantic>=2.0.0
zenml>=0.75.0
Common patterns
Pipeline structure
All examples follow this pattern:
from zenml import pipeline, step
from typing import Annotated, Dict, Any
@step
def run_agent ( query : str ) -> Annotated[Dict[ str , Any], "results" ]:
"""Execute framework-specific agent."""
# Initialize agent
agent = initialize_framework_agent()
# Run agent
result = agent.invoke(query)
# Return structured results
return {
"response" : extract_response(result),
"status" : "success" ,
}
@step
def format_response (
results : Dict[ str , Any]
) -> Annotated[ str , "formatted" ]:
"""Format agent output."""
return f "Response: { results[ 'response' ] } "
@pipeline
def agent_pipeline ( query : str = "Default query" ) -> str :
"""Framework-agnostic pipeline."""
results = run_agent(query)
formatted = format_response(results)
return formatted
Error handling
@step
def run_agent_with_error_handling (
query : str
) -> Annotated[Dict[ str , Any], "results" ]:
"""Execute agent with comprehensive error handling."""
try :
agent = initialize_agent()
result = agent.process(query)
return {
"response" : result.text,
"status" : "success" ,
}
except Exception as e:
return {
"response" : "I apologize, but I'm experiencing technical difficulties." ,
"status" : "error" ,
"error_message" : str (e),
}
Deployment
All examples support deployment:
# Deploy any framework pipeline
zenml pipeline deploy agent_pipeline --name my-agent
# Invoke deployed agent
zenml deployment invoke my-agent --query= "Your question here"
Framework-specific notes
Async frameworks
LlamaIndex and Semantic Kernel require async handling:
import asyncio
@step
def run_async_agent ( query : str ) -> Annotated[Dict[ str , Any], "results" ]:
"""Execute async agent."""
async def run ():
return await agent.aquery(query)
result = asyncio.run(run())
return { "response" : str (result), "status" : "success" }
Different frameworks return different types:
# LangGraph/LangChain: Message objects
response = result[ "messages" ][ - 1 ].content
# CrewAI: CrewOutput objects
response = str (result)
# LlamaIndex: Response objects
response = str (result)
# PydanticAI: Pydantic models
response = result.data.model_dump()
# OpenAI SDK: Message content
response = messages.data[ 0 ].content[ 0 ].text.value
Additional frameworks
The collection includes 7 more frameworks:
Haystack : RAG pipelines with retrieval
OpenAI Agents SDK : Official OpenAI implementation
Semantic Kernel : Microsoft’s plugin architecture
Autogen : Conversational multi-agent systems
AWS Strands : AWS Bedrock integration
Qwen-Agent : Qwen model integration
Google ADK : Gemini-powered agents
See the Agent Frameworks page for detailed integration patterns.
Running all examples
Test all frameworks:
cd agent_framework_integrations/
# Run all examples
for dir in */ ; do
echo "Testing $dir "
cd " $dir "
uv pip install -r requirements.txt
python run.py
cd ..
done
Next steps
Agent frameworks Detailed integration patterns for all 12 frameworks
Deploying agents Deploy agents as production HTTP services
Agent comparison Systematic evaluation of agent architectures
Orchestrating agents Production agent orchestration patterns