Documentation Index
Fetch the complete documentation index at: https://mintlify.com/getzep/graphiti/llms.txt
Use this file to discover all available pages before exploring further.
Graphiti is designed for high-performance knowledge graph operations. This guide covers configuration, optimization, and scaling strategies for production deployments.
Concurrency Control
The most critical performance setting is SEMAPHORE_LIMIT, which controls concurrent episode processing.
Understanding SEMAPHORE_LIMIT
Graphiti’s ingestion pipelines are highly concurrent. SEMAPHORE_LIMIT determines how many episodes can be processed simultaneously. Each episode involves multiple LLM calls:
- Entity extraction (2-3 calls)
- Entity deduplication (1-2 calls)
- Fact extraction (2-3 calls)
- Summarization (1-2 calls)
Actual concurrent LLM requests = SEMAPHORE_LIMIT × 6-10
Default Configuration
# In graphiti_core/helpers.py
SEMAPHORE_LIMIT = int(os.getenv('SEMAPHORE_LIMIT', 20))
The default is conservative. For MCP server and production deployments:
# MCP server default (more conservative)
SEMAPHORE_LIMIT=10
# Core library default
SEMAPHORE_LIMIT=20
Tuning by LLM Provider
OpenAI
# Tier 1 (free): 3 RPM → very limited
export SEMAPHORE_LIMIT=1
# Tier 2: 60 RPM
export SEMAPHORE_LIMIT=5
# Tier 3: 500 RPM (most common paid tier)
export SEMAPHORE_LIMIT=10
# Tier 4: 5,000 RPM
export SEMAPHORE_LIMIT=30
# Tier 5: 10,000+ RPM
export SEMAPHORE_LIMIT=50
Anthropic
# Default tier: 50 RPM
export SEMAPHORE_LIMIT=5
# Mid tier: 500 RPM
export SEMAPHORE_LIMIT=15
# High tier: 1,000+ RPM
export SEMAPHORE_LIMIT=30
Azure OpenAI
# Check your quota in Azure Portal
# Start conservative and monitor
export SEMAPHORE_LIMIT=10
# Scale up gradually based on quota
export SEMAPHORE_LIMIT=20 # For higher quotas
Ollama (Local LLM)
# Depends on hardware (CPU/GPU)
export SEMAPHORE_LIMIT=2 # Conservative for CPU-only
export SEMAPHORE_LIMIT=5 # For GPU acceleration
# Monitor resource usage and adjust
Groq
# Groq offers very high throughput
export SEMAPHORE_LIMIT=30
# Can go higher with premium access
export SEMAPHORE_LIMIT=50
Symptoms of Misconfiguration
Too High:
- 429 rate limit errors in logs
- Increased API costs from retries
- Memory pressure from queued operations
- Inconsistent response times
Too Low:
- Slow episode ingestion
- Underutilized API quota
- Poor throughput
- Long processing queues
Monitoring and Adjustment
import logging
import time
logger = logging.getLogger('graphiti')
# Monitor episode processing time
start = time.time()
await graphiti.add_episode(...)
elapsed = time.time() - start
logger.info(f"Episode processed in {elapsed:.2f}s")
# Track 429 errors
try:
await graphiti.add_episode(...)
except RateLimitError as e:
logger.warning(f"Rate limit hit: {e}")
# Consider lowering SEMAPHORE_LIMIT
Dynamic Adjustment
Adjust concurrency at runtime:
from graphiti_core import helpers
# Lower concurrency during high load
helpers.SEMAPHORE_LIMIT = 5
# Increase during off-peak
helpers.SEMAPHORE_LIMIT = 20
Database Optimization
Memory Configuration
Edit neo4j.conf:
# Heap size (general rule: 50% of available RAM, max 32GB)
dbms.memory.heap.initial_size=4G
dbms.memory.heap.max_size=4G
# Page cache (remaining RAM after heap)
dbms.memory.pagecache.size=4G
# Transaction state
dbms.memory.transaction.global_max_size=2G
dbms.memory.transaction.max_size=1G
Index Configuration
Create optimal indices:
// Vector indices for embeddings
CREATE VECTOR INDEX entity_embedding IF NOT EXISTS
FOR (n:Entity)
ON n.name_embedding
OPTIONS {
indexConfig: {
`vector.dimensions`: 1536,
`vector.similarity_function`: 'cosine'
}
};
// Fulltext search
CREATE FULLTEXT INDEX entity_search IF NOT EXISTS
FOR (n:Entity)
ON EACH [n.name, n.summary];
// Property indices
CREATE INDEX entity_uuid IF NOT EXISTS FOR (n:Entity) ON (n.uuid);
CREATE INDEX entity_group_id IF NOT EXISTS FOR (n:Entity) ON (n.group_id);
CREATE INDEX entity_created_at IF NOT EXISTS FOR (n:Entity) ON (n.created_at);
Query Optimization
Use query plans to identify bottlenecks:
// Profile a search query
PROFILE
MATCH (n:Entity {group_id: $group_id})
WHERE n.name CONTAINS $query
RETURN n
LIMIT 10;
// Look for:
// - Db Hits (lower is better)
// - Index usage (should use indices)
// - Estimated Rows (accuracy)
Connection Pooling
from graphiti_core.driver.neo4j_driver import Neo4jDriver
from neo4j import AsyncGraphDatabase
# Configure pool size
driver = AsyncGraphDatabase.driver(
uri="bolt://localhost:7687",
auth=("neo4j", "password"),
max_connection_pool_size=50, # Default: 100
connection_acquisition_timeout=60, # Seconds
)
neo4j_driver = Neo4jDriver(
uri="bolt://localhost:7687",
user="neo4j",
password="password"
)
neo4j_driver.client = driver
Redis Configuration
Optimize Redis for FalkorDB:
# redis.conf
# Memory
maxmemory 8gb
maxmemory-policy allkeys-lru
# Persistence (adjust based on durability needs)
save 900 1
save 300 10
save 60 10000
# Network
tcp-backlog 511
timeout 0
tcp-keepalive 300
Graph-Specific Settings
from graphiti_core.driver.falkordb_driver import FalkorDriver
driver = FalkorDriver(
host="localhost",
port=6379,
database="graphiti"
)
# FalkorDB automatically optimizes queries
# No manual index creation needed
File System Optimization
from graphiti_core.driver.kuzu_driver import KuzuDriver
# Use fast storage (SSD)
driver = KuzuDriver(
db="/mnt/nvme/graphiti.kuzu", # SSD path
max_concurrent_queries=4 # Adjust based on CPU cores
)
Memory vs Disk Trade-off
# In-memory for speed (loses data on restart)
driver = KuzuDriver(db=":memory:")
# Persistent storage
driver = KuzuDriver(db="/path/to/persistent.kuzu")
Chunking Configuration
Graphiti automatically chunks large episodes to avoid LLM context limits.
Chunking Parameters
# Content chunking (from graphiti_core/helpers.py)
export CHUNK_TOKEN_SIZE=3000 # Default: 3000 tokens per chunk
export CHUNK_OVERLAP_TOKENS=200 # Default: 200 token overlap
export CHUNK_MIN_TOKENS=1000 # Minimum size before chunking
# Entity density threshold
# Chunk if: elements per 1000 tokens > threshold * 1000
export ENTITY_DENSITY_THRESHOLD=0.15
Tuning Guidance
Large documents:
# Process large documents faster (smaller chunks)
export CHUNK_TOKEN_SIZE=2000
export CHUNK_OVERLAP_TOKENS=100
Dense entity extraction:
# More context for entity-rich content
export CHUNK_TOKEN_SIZE=4000
export CHUNK_OVERLAP_TOKENS=300
Cost optimization:
# Larger chunks = fewer LLM calls = lower cost
export CHUNK_TOKEN_SIZE=4000
# But may hit context limits on some models
Batch Embeddings
Graphiti batches embedding requests by default:
# In your code, embeddings are automatically batched
await graphiti.add_episode(...) # Internally batches embeddings
Choose Faster Embedding Models
from graphiti_core.embedder.openai import OpenAIEmbedder, OpenAIEmbedderConfig
# Faster, smaller model
embedder = OpenAIEmbedder(
config=OpenAIEmbedderConfig(
embedding_model="text-embedding-3-small",
embedding_dim=1536
)
)
graphiti = Graphiti(
"bolt://localhost:7687",
"neo4j",
"password",
embedder=embedder
)
Local Embeddings
Use local models to eliminate network latency:
from graphiti_core.embedder.sentence_transformers import (
SentenceTransformerEmbedder,
SentenceTransformerConfig
)
# Fast local embeddings
embedder = SentenceTransformerEmbedder(
config=SentenceTransformerConfig(
model="all-MiniLM-L6-v2", # Very fast, decent quality
embedding_dim=384
)
)
# Or higher quality
embedder = SentenceTransformerEmbedder(
config=SentenceTransformerConfig(
model="all-mpnet-base-v2", # Slower, better quality
embedding_dim=768
)
)
Limit Result Counts
# Faster searches with fewer results
results = await graphiti.search(
query="user preferences",
num_results=5 # Default: 10, lower is faster
)
Use Centered Searches
# More efficient with center node
results = await graphiti.search(
query="product info",
center_node_uuid=user_node_uuid, # Focuses search
num_results=10
)
Optimize Search Configuration
from graphiti_core.search.search_config import SearchConfig
# Custom search config
config = SearchConfig(
num_episodes=3, # Fewer episodes = faster
num_results=5,
max_facts=50, # Limit fact retrieval
reranker_weight=0.5
)
results = await graphiti.search(
query="test",
config=config
)
Parallel Processing
Enable Parallel Runtime
# Enable parallel processing (experimental)
export USE_PARALLEL_RUNTIME=true
Warning: This is experimental and may cause issues with some LLM providers.
Batch Episode Ingestion
import asyncio
# Process episodes in parallel (respects SEMAPHORE_LIMIT)
episodes = [
{"name": f"Episode {i}", "content": f"Content {i}"}
for i in range(100)
]
tasks = [
graphiti.add_episode(
name=ep["name"],
episode_body=ep["content"],
source=EpisodeType.text
)
for ep in episodes
]
# Concurrent execution (limited by SEMAPHORE_LIMIT)
await asyncio.gather(*tasks)
Caching Strategies
LLM Response Caching
Some providers support prompt caching:
# Anthropic prompt caching
from graphiti_core.llm_client.anthropic_client import AnthropicClient, LLMConfig
llm_client = AnthropicClient(
config=LLMConfig(
model="claude-4-sonnet-20250514",
cache_system_messages=True # Enable caching
)
)
graphiti = Graphiti(
"bolt://localhost:7687",
"neo4j",
"password",
llm_client=llm_client
)
Application-Level Caching
from functools import lru_cache
import hashlib
class CachedGraphiti:
def __init__(self, graphiti):
self.graphiti = graphiti
self._search_cache = {}
async def cached_search(self, query: str, num_results: int = 10):
cache_key = hashlib.md5(f"{query}:{num_results}".encode()).hexdigest()
if cache_key in self._search_cache:
return self._search_cache[cache_key]
results = await self.graphiti.search(query, num_results=num_results)
self._search_cache[cache_key] = results
return results
Monitoring and Profiling
Enable Logging
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('graphiti_core')
logger.setLevel(logging.DEBUG)
Track Metrics
import time
import statistics
class PerformanceTracker:
def __init__(self):
self.episode_times = []
self.search_times = []
async def timed_add_episode(self, graphiti, **kwargs):
start = time.time()
await graphiti.add_episode(**kwargs)
elapsed = time.time() - start
self.episode_times.append(elapsed)
return elapsed
def report(self):
return {
"avg_episode_time": statistics.mean(self.episode_times),
"p95_episode_time": statistics.quantiles(self.episode_times, n=20)[18],
"total_episodes": len(self.episode_times)
}
tracker = PerformanceTracker()
await tracker.timed_add_episode(graphiti, name="Test", episode_body="Content")
print(tracker.report())
OpenTelemetry Integration
See examples/opentelemetry/ for full instrumentation:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# Set up tracing
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
trace.get_tracer_provider().add_span_processor(
SimpleSpanProcessor(ConsoleSpanExporter())
)
# Instrument Graphiti operations
with tracer.start_as_current_span("add_episode"):
await graphiti.add_episode(...)
Production Deployment
Horizontal Scaling
Deploy multiple Graphiti instances:
# Kubernetes deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: graphiti-api
spec:
replicas: 5 # Scale horizontally
template:
spec:
containers:
- name: graphiti
image: zepai/graphiti:latest
env:
- name: SEMAPHORE_LIMIT
value: "15" # Lower per instance
- name: NEO4J_URI
value: "bolt://neo4j-cluster:7687" # Shared DB
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
Database Clustering
Neo4j Cluster
# Neo4j Causal Cluster
services:
neo4j-core-1:
image: neo4j:5.26-enterprise
environment:
- NEO4J_dbms_mode=CORE
- NEO4J_causal__clustering_initial__discovery__members=neo4j-core-1:5000,neo4j-core-2:5000,neo4j-core-3:5000
neo4j-core-2:
# ... similar config
neo4j-core-3:
# ... similar config
Load Balancing
# Round-robin across database replicas
from itertools import cycle
neo4j_uris = [
"bolt://neo4j-1:7687",
"bolt://neo4j-2:7687",
"bolt://neo4j-3:7687",
]
uri_cycle = cycle(neo4j_uris)
def get_driver():
uri = next(uri_cycle)
return Neo4jDriver(uri=uri, user="neo4j", password="password")
Typical performance on modern hardware:
| Operation | Avg Time | P95 Time | Notes |
|---|
| Add Episode (short) | 2-5s | 8s | SEMAPHORE_LIMIT=10 |
| Add Episode (long) | 8-15s | 25s | With chunking |
| Search (5 results) | 200-500ms | 1s | With indices |
| Search (20 results) | 500ms-1s | 2s | With reranking |
| Bulk ingest (100 episodes) | 30-60s | 90s | Parallel |
Hardware: 8-core CPU, 16GB RAM, SSD, OpenAI Tier 3
Troubleshooting
High Memory Usage
Symptoms: Memory grows unbounded
Solutions:
- Lower
SEMAPHORE_LIMIT
- Reduce
CHUNK_TOKEN_SIZE
- Enable database connection pooling
- Clear episode queue periodically
Slow Ingestion
Symptoms: Episodes take > 30s to process
Solutions:
- Increase
SEMAPHORE_LIMIT (if not hitting rate limits)
- Use faster embedding model
- Reduce chunking overhead
- Check database index health
Rate Limit Errors
Symptoms: 429 errors in logs
Solutions:
- Lower
SEMAPHORE_LIMIT
- Implement exponential backoff
- Upgrade LLM provider tier
- Switch to local models (Ollama)
Next Steps