Documentation Index
Fetch the complete documentation index at: https://mintlify.com/getzep/graphiti/llms.txt
Use this file to discover all available pages before exploring further.
Graphiti uses a pluggable driver architecture that allows you to integrate any graph database backend. This guide walks through implementing a custom driver from scratch.
Driver Architecture Overview
Graphiti’s driver layer is organized into three tiers:
1. GraphDriver ABC
The core interface (graphiti_core/driver/driver.py) that every backend must implement:
from graphiti_core.driver.driver import GraphDriver, GraphProvider
from abc import abstractmethod
from typing import Any, Coroutine
class GraphDriver(ABC):
provider: GraphProvider
@abstractmethod
def execute_query(self, cypher_query: str, **kwargs: Any) -> Coroutine:
"""Execute a query against the graph database."""
raise NotImplementedError()
@abstractmethod
def session(self, database: str | None = None) -> GraphDriverSession:
"""Create a new database session."""
raise NotImplementedError()
@abstractmethod
async def build_indices_and_constraints(self, delete_existing: bool = False):
"""Build database indices and constraints."""
raise NotImplementedError()
@abstractmethod
def close(self):
"""Close the database connection."""
raise NotImplementedError()
2. GraphProvider Enum
Identifies the backend in query builders:
from enum import Enum
class GraphProvider(Enum):
NEO4J = 'neo4j'
FALKORDB = 'falkordb'
KUZU = 'kuzu'
NEPTUNE = 'neptune'
# Add your backend here
3. Operations Interfaces
Eleven abstract base classes covering all CRUD and search operations:
- Node operations:
EntityNodeOperations, EpisodeNodeOperations, CommunityNodeOperations, SagaNodeOperations
- Edge operations:
EntityEdgeOperations, EpisodicEdgeOperations, CommunityEdgeOperations, HasEpisodeEdgeOperations, NextEpisodeEdgeOperations
- Search & maintenance:
SearchOperations, GraphMaintenanceOperations
Building a Custom Driver
Step 1: Add to GraphProvider Enum
Edit graphiti_core/driver/driver.py:
class GraphProvider(Enum):
NEO4J = 'neo4j'
FALKORDB = 'falkordb'
KUZU = 'kuzu'
NEPTUNE = 'neptune'
MY_BACKEND = 'my_backend' # Add your backend
Step 2: Create Directory Structure
mkdir -p graphiti_core/driver/my_backend/operations
touch graphiti_core/driver/my_backend/__init__.py
touch graphiti_core/driver/my_backend/operations/__init__.py
touch graphiti_core/driver/my_backend_driver.py
Step 3: Implement the Driver Class
Create graphiti_core/driver/my_backend_driver.py:
import logging
from typing import Any
from graphiti_core.driver.driver import GraphDriver, GraphDriverSession, GraphProvider
from graphiti_core.driver.my_backend.operations import (
MyBackendEntityNodeOperations,
MyBackendEpisodeNodeOperations,
MyBackendCommunityNodeOperations,
MyBackendSagaNodeOperations,
MyBackendEntityEdgeOperations,
MyBackendEpisodicEdgeOperations,
MyBackendCommunityEdgeOperations,
MyBackendHasEpisodeEdgeOperations,
MyBackendNextEpisodeEdgeOperations,
MyBackendSearchOperations,
MyBackendGraphMaintenanceOperations,
)
logger = logging.getLogger(__name__)
class MyBackendDriver(GraphDriver):
provider = GraphProvider.MY_BACKEND
aoss_client: None = None # Set if using external search (like Neptune)
def __init__(
self,
host: str = 'localhost',
port: int = 7687,
username: str | None = None,
password: str | None = None,
database: str = 'default_db',
):
super().__init__()
self._database = database
# Initialize your database client
self.client = MyBackendClient(
host=host,
port=port,
username=username,
password=password
)
# Instantiate all 11 operation classes
self._entity_node_ops = MyBackendEntityNodeOperations()
self._episode_node_ops = MyBackendEpisodeNodeOperations()
self._community_node_ops = MyBackendCommunityNodeOperations()
self._saga_node_ops = MyBackendSagaNodeOperations()
self._entity_edge_ops = MyBackendEntityEdgeOperations()
self._episodic_edge_ops = MyBackendEpisodicEdgeOperations()
self._community_edge_ops = MyBackendCommunityEdgeOperations()
self._has_episode_edge_ops = MyBackendHasEpisodeEdgeOperations()
self._next_episode_edge_ops = MyBackendNextEpisodeEdgeOperations()
self._search_ops = MyBackendSearchOperations()
self._graph_ops = MyBackendGraphMaintenanceOperations()
# Expose operations via properties
@property
def entity_node_ops(self):
return self._entity_node_ops
@property
def episode_node_ops(self):
return self._episode_node_ops
# ... implement all 11 properties
@property
def search_ops(self):
return self._search_ops
@property
def graph_ops(self):
return self._graph_ops
# Implement abstract methods
def execute_query(self, cypher_query: str, **kwargs: Any):
"""Execute a query against the database."""
return self.client.execute(cypher_query, **kwargs)
def session(self, database: str | None = None):
"""Create a database session."""
db = database or self._database
return MyBackendDriverSession(self.client, db)
async def build_indices_and_constraints(self, delete_existing: bool = False):
"""Build indices and constraints."""
if delete_existing:
await self.delete_all_indexes()
# Get index queries from shared builders
from graphiti_core.graph_queries import get_fulltext_indices, get_range_indices
fulltext_indices = get_fulltext_indices(self.provider)
range_indices = get_range_indices(self.provider)
async with self.session() as session:
for query in fulltext_indices + range_indices:
await session.run(query)
async def delete_all_indexes(self):
"""Delete all indices."""
# Implementation depends on your database
async with self.session() as session:
await session.run("DROP ALL INDEXES")
def close(self):
"""Close the database connection."""
self.client.close()
Step 4: Implement GraphDriverSession
Create a session class for connection management:
class MyBackendDriverSession(GraphDriverSession):
provider = GraphProvider.MY_BACKEND
def __init__(self, client, database: str):
self.client = client
self.database = database
self._session = client.create_session(database)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
async def run(self, query: str, **kwargs: Any) -> Any:
"""Execute a query in this session."""
return await self._session.execute(query, **kwargs)
async def close(self):
"""Close the session."""
await self._session.close()
async def execute_write(self, func, *args, **kwargs):
"""Execute a write transaction."""
return await func(self, *args, **kwargs)
Step 5: Implement Operations
Implement all 11 operation classes. Here’s an example of EntityNodeOperations:
# graphiti_core/driver/my_backend/operations/entity_node_ops.py
from graphiti_core.driver.operations.entity_node_ops import EntityNodeOperations
from graphiti_core.driver.query_executor import QueryExecutor, Transaction
from graphiti_core.nodes import EntityNode
from graphiti_core.models.nodes.node_db_queries import build_entity_node_upsert_query
class MyBackendEntityNodeOperations(EntityNodeOperations):
async def save(
self,
executor: QueryExecutor,
node: EntityNode,
tx: Transaction | None = None,
) -> None:
"""Save an entity node to the database."""
# Use shared query builder
from graphiti_core.driver.driver import GraphProvider
query = build_entity_node_upsert_query(GraphProvider.MY_BACKEND)
params = {
'uuid': node.uuid,
'name': node.name,
'group_id': node.group_id,
'labels': node.labels,
'created_at': node.created_at,
'summary': node.summary,
'name_embedding': node.name_embedding,
}
if tx:
await tx.run(query, **params)
else:
async with executor.session() as session:
await session.run(query, **params)
async def get_by_uuid(
self,
executor: QueryExecutor,
uuid: str,
) -> EntityNode:
"""Retrieve an entity node by UUID."""
query = """
MATCH (n:Entity {uuid: $uuid})
RETURN n
"""
async with executor.session() as session:
result = await session.run(query, uuid=uuid)
# Parse result into EntityNode
return self._parse_entity_node(result)
def _parse_entity_node(self, result) -> EntityNode:
"""Parse database result into EntityNode."""
# Implementation depends on result format
node_data = result[0]['n']
return EntityNode(
uuid=node_data['uuid'],
name=node_data['name'],
group_id=node_data['group_id'],
labels=node_data['labels'],
created_at=node_data['created_at'],
summary=node_data.get('summary'),
name_embedding=node_data.get('name_embedding'),
)
# Implement remaining methods:
# - save_bulk
# - delete
# - delete_by_group_id
# - delete_by_uuids
# - get_by_uuids
# - get_by_group_ids
# - load_embeddings
# - load_embeddings_bulk
Step 6: Add Query Variants
Add database-specific query builders in graphiti_core/models/nodes/node_db_queries.py:
def build_entity_node_upsert_query(provider: GraphProvider) -> str:
"""Build entity node upsert query for different providers."""
match provider:
case GraphProvider.NEO4J:
return """
MERGE (n:Entity {uuid: $uuid})
ON CREATE SET
n.name = $name,
n.group_id = $group_id,
n.labels = $labels,
n.created_at = $created_at
ON MATCH SET
n.name = $name,
n.summary = $summary,
n.name_embedding = $name_embedding
"""
case GraphProvider.MY_BACKEND:
return """
-- Your database-specific syntax
INSERT INTO entities (uuid, name, group_id, labels, created_at, summary)
VALUES ($uuid, $name, $group_id, $labels, $created_at, $summary)
ON CONFLICT (uuid) DO UPDATE SET
name = EXCLUDED.name,
summary = EXCLUDED.summary
"""
case _:
raise NotImplementedError(f"Unsupported provider: {provider}")
Do the same for edge queries in graphiti_core/models/edges/edge_db_queries.py.
Step 7: Register Optional Dependency
Add to pyproject.toml:
[project.optional-dependencies]
my_backend = [
"my-backend-client>=1.0.0",
]
Step 8: Export from init.py
Create graphiti_core/driver/my_backend/operations/__init__.py:
from graphiti_core.driver.my_backend.operations.entity_node_ops import (
MyBackendEntityNodeOperations,
)
from graphiti_core.driver.my_backend.operations.episode_node_ops import (
MyBackendEpisodeNodeOperations,
)
# ... export all 11 operation classes
__all__ = [
'MyBackendEntityNodeOperations',
'MyBackendEpisodeNodeOperations',
# ... all 11 classes
]
Real-World Examples
Neo4j Driver (Full-Featured Reference)
The Neo4j driver is the most straightforward reference implementation:
from graphiti_core.driver.neo4j_driver import Neo4jDriver
driver = Neo4jDriver(
uri="bolt://localhost:7687",
user="neo4j",
password="password",
database="neo4j"
)
Key features:
- Full transaction support
- Native Cypher queries
- Vector index support
- Constraint management
See: graphiti_core/driver/neo4j_driver.py and graphiti_core/driver/neo4j/operations/
FalkorDB Driver (Lightweight Alternative)
FalkorDB is Redis-based with a different query approach:
from graphiti_core.driver.falkordb_driver import FalkorDriver
driver = FalkorDriver(
host="localhost",
port=6379,
username=None,
password=None,
database="default_db"
)
Key differences:
- Redis-based connection
- Custom fulltext query syntax (
@ prefix)
- Stopwords filtering
- No native transaction support
See: graphiti_core/driver/falkordb_driver.py and graphiti_core/driver/falkordb/operations/
Kuzu Driver (Embedded Database)
Kuzu demonstrates an embedded, in-process database:
from graphiti_core.driver.kuzu_driver import KuzuDriver
driver = KuzuDriver(
db="/tmp/graphiti.kuzu", # File path or :memory:
max_concurrent_queries=1
)
Key features:
- Embedded database (no server)
- Explicit schema definition
- Custom edge representation (RelatesToNode_)
- Schema creation in init
Schema definition in kuzu_driver.py:
SCHEMA_QUERIES = """
CREATE NODE TABLE IF NOT EXISTS Entity (
uuid STRING PRIMARY KEY,
name STRING,
group_id STRING,
labels STRING[],
created_at TIMESTAMP,
name_embedding FLOAT[],
summary STRING
);
-- ... more tables
"""
See: graphiti_core/driver/kuzu_driver.py and graphiti_core/driver/kuzu/operations/
Neptune Driver (Cloud Backend with External Search)
Neptune shows integration with external search (OpenSearch):
from graphiti_core.driver.neptune_driver import NeptuneDriver
driver = NeptuneDriver(
host="neptune-endpoint.amazonaws.com",
aoss_host="opensearch-endpoint.amazonaws.com",
port=8182,
aoss_port=443
)
Key features:
- Cloud-managed graph database
- External OpenSearch for fulltext
- AWS IAM authentication
- Separate search backend
See: graphiti_core/driver/neptune_driver.py and graphiti_core/driver/neptune/operations/
Usage Pattern
Once implemented, use your driver with Graphiti:
from graphiti_core import Graphiti
from graphiti_core.driver.my_backend_driver import MyBackendDriver
# Create driver instance
driver = MyBackendDriver(
host="localhost",
port=7687,
username="user",
password="password",
database="my_graph"
)
# Pass to Graphiti
graphiti = Graphiti(graph_driver=driver)
# Build indices
await graphiti.build_indices_and_constraints()
# Use normally
await graphiti.add_episode(
name="Test Episode",
episode_body="This is a test",
source=EpisodeType.text
)
results = await graphiti.search("test query")
Testing Your Driver
Create tests in tests/driver/test_my_backend_driver.py:
import pytest
from graphiti_core.driver.my_backend_driver import MyBackendDriver
from graphiti_core.nodes import EntityNode
from datetime import datetime, timezone
@pytest.fixture
async def driver():
driver = MyBackendDriver(
host="localhost",
port=7687,
database="test_db"
)
await driver.build_indices_and_constraints(delete_existing=True)
yield driver
await driver.delete_all_indexes()
driver.close()
@pytest.mark.asyncio
async def test_entity_node_save_and_retrieve(driver):
"""Test saving and retrieving an entity node."""
node = EntityNode(
uuid="test-uuid",
name="Test Entity",
group_id="test-group",
labels=["Person"],
created_at=datetime.now(timezone.utc)
)
# Save node
await driver.entity_node_ops.save(driver, node)
# Retrieve node
retrieved = await driver.entity_node_ops.get_by_uuid(driver, "test-uuid")
assert retrieved.uuid == node.uuid
assert retrieved.name == node.name
assert retrieved.group_id == node.group_id
Best Practices
1. Use Query Builders
Leverage shared query builders instead of hardcoding queries:
from graphiti_core.models.nodes.node_db_queries import build_entity_node_upsert_query
query = build_entity_node_upsert_query(self.provider)
2. Handle Transactions Properly
Implement transaction support if your database supports it:
from contextlib import asynccontextmanager
from graphiti_core.driver.query_executor import Transaction
@asynccontextmanager
async def transaction(self):
session = self.session()
tx = await session.begin_transaction()
try:
yield tx
await tx.commit()
except Exception:
await tx.rollback()
raise
finally:
await session.close()
3. Parse Results Consistently
Create helper methods for result parsing:
def _parse_entity_node(self, record) -> EntityNode:
"""Parse database record to EntityNode."""
# Consistent parsing logic
pass
4. Handle Database Dialects
Account for syntax differences in your query builders:
match provider:
case GraphProvider.NEO4J:
return "MERGE (n:Entity {uuid: $uuid})"
case GraphProvider.MY_BACKEND:
return "INSERT INTO entities ..."
5. Document Limitations
Clearly document any limitations:
class MyBackendDriver(GraphDriver):
"""
Driver for MyBackend graph database.
Limitations:
- No native vector index support (uses approximation)
- Transactions limited to single session
- Maximum 10,000 nodes per query
"""
Connection Pooling
class MyBackendDriver(GraphDriver):
def __init__(self, host, port, pool_size=10):
self.client = MyBackendClient(
host=host,
port=port,
max_pool_size=pool_size # Connection pooling
)
Batch Operations
Implement efficient bulk operations:
async def save_bulk(
self,
executor: QueryExecutor,
nodes: list[EntityNode],
batch_size: int = 100,
) -> None:
"""Save nodes in batches for efficiency."""
for i in range(0, len(nodes), batch_size):
batch = nodes[i:i + batch_size]
query = build_bulk_upsert_query(self.provider)
params = {'nodes': [n.model_dump() for n in batch]}
async with executor.session() as session:
await session.run(query, **params)
Index Optimization
Create appropriate indices:
async def build_indices_and_constraints(self, delete_existing=False):
indices = [
"CREATE INDEX entity_uuid ON Entity(uuid)",
"CREATE INDEX entity_group_id ON Entity(group_id)",
"CREATE VECTOR INDEX entity_embedding ON Entity(name_embedding)",
]
async with self.session() as session:
for index in indices:
await session.run(index)
Troubleshooting
Driver Not Found
Error: ImportError: cannot import name 'MyBackendDriver'
Solution: Ensure your driver is installed:
pip install graphiti-core[my_backend]
Query Syntax Errors
Error: SyntaxError: Invalid query syntax
Solution: Add your provider to query builders:
case GraphProvider.MY_BACKEND:
return "your-database-specific-query"
Missing Operations
Error: AttributeError: 'MyBackendDriver' object has no attribute 'entity_node_ops'
Solution: Implement all 11 operation properties:
@property
def entity_node_ops(self):
return self._entity_node_ops
Contributing Your Driver
To contribute your driver to the Graphiti project:
- Fork the repository
- Implement your driver following this guide
- Add comprehensive tests
- Update documentation
- Submit a pull request
See CONTRIBUTING.md for details.
Resources
Next Steps