Documentation Index
Fetch the complete documentation index at: https://mintlify.com/getzep/graphiti/llms.txt
Use this file to discover all available pages before exploring further.
Integrate Graphiti with LangGraph to build sophisticated agents that maintain persistent memory through knowledge graphs, enabling context-aware conversations and personalized responses.
Overview
LangGraph is LangChain’s library for building stateful, multi-actor applications with LLMs. Combined with Graphiti, you can:
- Maintain persistent conversation memory across sessions
- Extract and store structured knowledge from interactions
- Retrieve relevant context using graph-based search
- Build agents that learn and adapt over time
- Personalize responses based on user history
Installation
pip install graphiti-core langchain-openai langgraph
# For Jupyter notebooks
pip install ipywidgets
Ensure you have Neo4j or FalkorDB running:
# Neo4j via Docker
docker compose up
# Or FalkorDB
docker run -p 6379:6379 -p 3000:3000 -it --rm falkordb/falkordb:latest
Quick Start Example
This example demonstrates a sales agent that uses Graphiti to:
- Remember user preferences and conversation history
- Search product knowledge stored in the graph
- Personalize recommendations based on learned information
Initialize Graphiti
import os
from datetime import datetime, timezone
from graphiti_core import Graphiti
from graphiti_core.nodes import EpisodeType
# Configure Graphiti
neo4j_uri = os.getenv('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.getenv('NEO4J_USER', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD', 'password')
client = Graphiti(neo4j_uri, neo4j_user, neo4j_password)
await client.build_indices_and_constraints()
Load Product Data
Ingest product information into the knowledge graph:
import json
from pathlib import Path
async def ingest_products_data(client: Graphiti):
# Load product JSON
with open('products.json') as f:
products = json.load(f)['products']
# Add each product as an episode
for i, product in enumerate(products):
await client.add_episode(
name=product.get('title', f'Product {i}'),
episode_body=str({k: v for k, v in product.items() if k != 'images'}),
source_description='Product catalog',
source=EpisodeType.json,
reference_time=datetime.now(timezone.utc),
)
await ingest_products_data(client)
Create User Node
Establish a user entity in the graph:
from graphiti_core.search.search_config_recipes import NODE_HYBRID_SEARCH_EPISODE_MENTIONS
user_name = 'jess'
# Add initial user episode
await client.add_episode(
name='User Creation',
episode_body=f'{user_name} is interested in buying running shoes',
source=EpisodeType.text,
reference_time=datetime.now(timezone.utc),
source_description='SalesBot',
)
# Get user node UUID for centered searches
node_list = await client._search(user_name, NODE_HYBRID_SEARCH_EPISODE_MENTIONS)
user_node_uuid = node_list.nodes[0].uuid
# Get product brand node UUID
node_list = await client._search('ManyBirds', NODE_HYBRID_SEARCH_EPISODE_MENTIONS)
brand_node_uuid = node_list.nodes[0].uuid
Create a LangChain tool for querying product information:
from langchain_core.tools import tool
from graphiti_core.edges import EntityEdge
def edges_to_facts_string(entities: list[EntityEdge]):
"""Convert entity edges to readable facts."""
return '-' + '\n- '.join([edge.fact for edge in entities])
@tool
async def get_shoe_data(query: str) -> str:
"""Search the knowledge graph for information about shoes."""
edge_results = await client.search(
query,
center_node_uuid=brand_node_uuid, # Center search on product brand
num_results=10,
)
return edges_to_facts_string(edge_results)
tools = [get_shoe_data]
Build the Agent
from langchain_core.messages import AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph, add_messages
from langgraph.prebuilt import ToolNode
from typing import Annotated
from typing_extensions import TypedDict
import asyncio
class State(TypedDict):
messages: Annotated[list, add_messages]
user_name: str
user_node_uuid: str
async def chatbot(state: State):
"""Agent node that retrieves context from Graphiti and generates responses."""
facts_string = None
if len(state['messages']) > 0:
last_message = state['messages'][-1]
query = f'{"SalesBot" if isinstance(last_message, AIMessage) else state["user_name"]}: {last_message.content}'
# Search Graphiti using user's node as center
# Facts closer to the user node are ranked higher
edge_results = await client.search(
query,
center_node_uuid=state['user_node_uuid'],
num_results=5
)
facts_string = edges_to_facts_string(edge_results)
# Build system message with retrieved facts
system_message = SystemMessage(
content=f"""You are a skillful shoe salesperson working for ManyBirds.
Review information about the user and their conversation below.
Keep responses short and concise. Always be selling and helpful!
Things you need to know to close a sale:
- User's shoe size
- Any special needs (wide feet, arch support, etc.)
- Preferred colors and styles
- Budget
Facts about the user:
{facts_string or 'No facts about the user yet'}"""
)
messages = [system_message] + state['messages']
response = await llm.ainvoke(messages)
# Asynchronously persist interaction to Graphiti
asyncio.create_task(
client.add_episode(
name='Chatbot Response',
episode_body=f'{state["user_name"]}: {state["messages"][-1].content}\nSalesBot: {response.content}',
source=EpisodeType.message,
reference_time=datetime.now(timezone.utc),
source_description='Chatbot',
)
)
return {'messages': [response]}
# Initialize LLM with tools
llm = ChatOpenAI(model='gpt-4.1-mini', temperature=0).bind_tools(tools)
tool_node = ToolNode(tools)
# Define conditional logic
async def should_continue(state, config):
last_message = state['messages'][-1]
return 'end' if not last_message.tool_calls else 'continue'
# Build the graph
graph_builder = StateGraph(State)
memory = MemorySaver()
graph_builder.add_node('agent', chatbot)
graph_builder.add_node('tools', tool_node)
graph_builder.add_edge(START, 'agent')
graph_builder.add_conditional_edges(
'agent',
should_continue,
{'continue': 'tools', 'end': END}
)
graph_builder.add_edge('tools', 'agent')
graph = graph_builder.compile(checkpointer=memory)
Run the Agent
import uuid
# Single interaction
response = await graph.ainvoke(
{
'messages': [{
'role': 'user',
'content': 'What sizes do the Wool Runners come in?'
}],
'user_name': user_name,
'user_node_uuid': user_node_uuid,
},
config={'configurable': {'thread_id': uuid.uuid4().hex}}
)
print(response['messages'][-1].content)
Interactive Agent Loop
config = {'configurable': {'thread_id': uuid.uuid4().hex}}
user_state = {'user_name': user_name, 'user_node_uuid': user_node_uuid}
print("Agent: Hello! How can I help you find shoes today?")
while True:
user_input = input("You: ")
if user_input.lower() in ['exit', 'quit', 'bye']:
print("Agent: Thank you for chatting! Goodbye!")
break
graph_state = {
'messages': [{'role': 'user', 'content': user_input}],
'user_name': user_state['user_name'],
'user_node_uuid': user_state['user_node_uuid'],
}
async for event in graph.astream(graph_state, config=config):
for value in event.values():
if 'messages' in value:
last_message = value['messages'][-1]
if isinstance(last_message, AIMessage) and isinstance(last_message.content, str):
print(f"Agent: {last_message.content}")
Key Integration Patterns
1. Context Retrieval
Use Graphiti’s centered search to retrieve relevant facts:
# Center search on user node for personalized context
user_context = await client.search(
query=user_message,
center_node_uuid=user_node_uuid,
num_results=5
)
# Center search on domain entities for factual information
product_info = await client.search(
query=product_query,
center_node_uuid=product_category_uuid,
num_results=10
)
2. Asynchronous Episode Persistence
Avoid blocking agent responses by persisting episodes asynchronously:
import asyncio
# Don't await - fire and forget
asyncio.create_task(
client.add_episode(
name='Interaction',
episode_body=conversation_text,
source=EpisodeType.message,
reference_time=datetime.now(timezone.utc),
)
)
3. Dual Memory Strategy
- LangGraph MemorySaver: Short-term conversation state within a session
- Graphiti: Long-term knowledge persistence across sessions
# LangGraph manages immediate conversation flow
memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)
# Graphiti stores extractable knowledge
await client.add_episode(...) # Persisted facts
4. Multi-Agent Collaboration
Share knowledge between multiple agents via Graphiti:
# Sales agent adds customer preference
await client.add_episode(
name='Customer Preference',
episode_body='Customer prefers blue shoes',
group_id='customer-123',
source=EpisodeType.text,
)
# Support agent retrieves the preference later
context = await client.search(
'customer preferences',
group_id='customer-123',
num_results=5
)
Advanced Patterns
Generate tools based on graph entities:
from langchain_core.tools import tool
def create_category_search_tool(category_name: str, category_uuid: str):
@tool
async def search_category(query: str) -> str:
f"""Search for {category_name} information."""
results = await client.search(
query,
center_node_uuid=category_uuid,
num_results=10
)
return edges_to_facts_string(results)
return search_category
# Create tools for each category
categories = [
('Running Shoes', 'uuid-running'),
('Hiking Boots', 'uuid-hiking'),
('Casual Sneakers', 'uuid-casual'),
]
tools = [create_category_search_tool(name, uuid) for name, uuid in categories]
Temporal Context
Leverage Graphiti’s temporal awareness:
from datetime import timedelta
# Search for recent interactions
recent_time = datetime.now(timezone.utc) - timedelta(days=7)
recent_context = await client.search(
query='customer interactions',
reference_time=recent_time,
num_results=10
)
# System message with temporal context
system_message = SystemMessage(
content=f"""Recent customer activity (last 7 days):
{edges_to_facts_string(recent_context)}
Acknowledge any recent purchases or concerns."""
)
Structured Output Integration
Combine Graphiti with LangChain’s structured output:
from pydantic import BaseModel, Field
from langchain_core.output_parsers import PydanticOutputParser
class CustomerProfile(BaseModel):
shoe_size: int | None = Field(description="Customer's shoe size")
preferred_colors: list[str] = Field(description="Preferred colors")
budget_range: str | None = Field(description="Budget range")
special_needs: list[str] = Field(description="Special requirements")
parser = PydanticOutputParser(pydantic_object=CustomerProfile)
# Extract structured data from graph context
edges = await client.search(f'customer profile for {user_name}', num_results=10)
facts = edges_to_facts_string(edges)
prompt = f"""Extract customer profile from these facts:
{facts}
{parser.get_format_instructions()}
"""
response = await llm.ainvoke(prompt)
profile = parser.parse(response.content)
Visualizing the Knowledge Graph
After interactions, the knowledge graph captures relationships:
// View user's preferences in Neo4j Browser
MATCH (u:Entity {name: 'jess'})-[r:RELATES_TO]->(n)
RETURN u, r, n
LIMIT 25
Example graph structure:
(jess:Entity)-[:INTERESTED_IN]->(Running Shoes:Entity)
(jess)-[:PREFERS]->(Size 10:Entity)
(jess)-[:LIKES]->(Blue Color:Entity)
(Running Shoes)-[:HAS_FEATURE]->(Lightweight:Entity)
(Running Shoes)-[:MANUFACTURED_BY]->(ManyBirds:Entity)
Best Practices
1. Scope Your Searches
Use center_node_uuid to focus retrieval:
# Personalized search centered on user
user_facts = await client.search(
query,
center_node_uuid=user_node_uuid,
num_results=5
)
# Domain search centered on topic
domain_facts = await client.search(
query,
center_node_uuid=domain_node_uuid,
num_results=10
)
2. Use Group IDs for Multi-Tenancy
# Separate graphs per customer
await client.add_episode(
name='Interaction',
episode_body=content,
group_id=f'customer-{customer_id}',
source=EpisodeType.text,
)
# Search within customer's graph
results = await client.search(
query,
group_id=f'customer-{customer_id}',
num_results=10
)
3. Balance Concurrency
Adjust SEMAPHORE_LIMIT based on LLM tier:
# For development/low-tier APIs
export SEMAPHORE_LIMIT=5
# For production/high-tier APIs
export SEMAPHORE_LIMIT=20
4. Implement Error Handling
async def safe_add_episode(content: str):
try:
await client.add_episode(
name='Interaction',
episode_body=content,
source=EpisodeType.text,
reference_time=datetime.now(timezone.utc),
)
except Exception as e:
logger.error(f"Failed to persist episode: {e}")
# Continue agent operation even if persistence fails
Example: Multi-Agent Customer Service
class CustomerServiceState(TypedDict):
messages: Annotated[list, add_messages]
customer_id: str
customer_node_uuid: str
current_agent: str # 'sales', 'support', 'billing'
async def route_to_agent(state: CustomerServiceState):
"""Route to appropriate agent based on intent."""
last_message = state['messages'][-1].content
# Retrieve customer context
context = await client.search(
f"customer {state['customer_id']} {last_message}",
center_node_uuid=state['customer_node_uuid'],
num_results=10
)
# Determine routing based on context
intent_prompt = f"""Given this customer context:
{edges_to_facts_string(context)}
And their message: {last_message}
Route to: sales, support, or billing
"""
routing = await llm.ainvoke(intent_prompt)
return routing.content.lower()
# Build multi-agent graph
graph_builder = StateGraph(CustomerServiceState)
graph_builder.add_node('router', route_to_agent)
graph_builder.add_node('sales', sales_agent)
graph_builder.add_node('support', support_agent)
graph_builder.add_node('billing', billing_agent)
graph_builder.add_edge(START, 'router')
graph_builder.add_conditional_edges(
'router',
lambda s: s['current_agent'],
{
'sales': 'sales',
'support': 'support',
'billing': 'billing'
}
)
LangSmith Integration
Trace agent execution with LangSmith:
import os
# Enable tracing
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_PROJECT'] = 'Graphiti-LangGraph-Agent'
os.environ['LANGCHAIN_API_KEY'] = 'your-langsmith-key'
# Run agent - traces automatically captured
await graph.ainvoke(...)
Resources
Next Steps