Retrieval Augmented Generation (RAG) enhances agents with external knowledge by retrieving relevant documents before generating responses.
Quick Start
The simplest way to add RAG is using Gemini’s built-in File Search:
from pathlib import Path
from vision_agents.core import Agent, User
from vision_agents.plugins import gemini, getstream, deepgram, elevenlabs
# Create file search store
file_search_store = await gemini.create_file_search_store(
name = "product-knowledge" ,
knowledge_dir = Path( "./knowledge" ),
extensions = [ ".md" , ".txt" ],
)
# Create LLM with File Search tool
llm = gemini.LLM(
"gemini-2.5-flash-lite" ,
tools = [gemini.tools.FileSearch(file_search_store)],
)
agent = Agent(
edge = getstream.Edge(),
agent_user = User( id = "ai-agent" , name = "AI" ),
instructions = "Use your knowledge base to answer questions about our products." ,
llm = llm,
tts = elevenlabs.TTS(),
stt = deepgram.STT( eager_turn_detection = True ),
)
Gemini File Search is the easiest option - it handles everything automatically including chunking, embedding, and retrieval.
TurboPuffer Hybrid Search
For more control and better retrieval quality, use TurboPuffer with hybrid search (vector + BM25):
from pathlib import Path
from vision_agents.plugins import turbopuffer, gemini
# Create RAG with hybrid search
rag = await turbopuffer.create_rag(
namespace = "product-knowledge" ,
knowledge_dir = Path( "./knowledge" ),
extensions = [ ".md" ],
region = "gcp-us-central1" ,
)
# Register as a function
llm = gemini.LLM( "gemini-2.5-flash-lite" )
@llm.register_function (
description = "Search knowledge base for detailed product information"
)
async def search_knowledge ( query : str ) -> str :
return await rag.search(query, top_k = 3 )
agent = Agent(
llm = llm,
...
)
Hybrid Search Modes
TurboPuffer supports three search modes:
Hybrid (Default)
Vector Only
BM25 Only
# Combines vector + BM25 using Reciprocal Rank Fusion
results = await rag.search(
"How does chat API work?" ,
top_k = 3 ,
mode = "hybrid" # default
)
Hybrid search combines the best of both:
Vector search : Semantic understanding (“how to send messages” matches “message sending guide”)
BM25 search : Exact keyword matching (“SDK-123” matches exactly “SDK-123”)
Results are merged using Reciprocal Rank Fusion (RRF) for superior retrieval quality.
Phone Agent with RAG
Complete example of a phone agent with knowledge retrieval:
import asyncio
import os
from pathlib import Path
import uvicorn
from fastapi import FastAPI, WebSocket, Depends
from vision_agents.core import User, Agent
from vision_agents.plugins import (
getstream,
gemini,
twilio,
elevenlabs,
deepgram,
turbopuffer,
)
app = FastAPI()
call_registry = twilio.TwilioCallRegistry()
KNOWLEDGE_DIR = Path( __file__ ).parent / "knowledge"
rag = None # Initialized at startup
async def create_rag_from_directory ():
"""Initialize RAG with knowledge files."""
global rag
rag = await turbopuffer.create_rag(
namespace = "product-knowledge" ,
knowledge_dir = KNOWLEDGE_DIR ,
extensions = [ ".md" ],
)
print ( f "RAG ready with { len (rag.indexed_files) } documents" )
async def create_agent () -> Agent:
llm = gemini.LLM( "gemini-2.5-flash-lite" )
@llm.register_function (
description = "Search product knowledge base for detailed information"
)
async def search_knowledge ( query : str ) -> str :
return await rag.search(query, top_k = 3 )
return Agent(
edge = getstream.Edge(),
agent_user = User( id = "ai-agent" , name = "AI" ),
instructions = "Answer questions using your knowledge base." ,
tts = elevenlabs.TTS(),
stt = deepgram.STT( eager_turn_detection = True ),
llm = llm,
)
@app.post ( "/twilio/voice" )
async def twilio_voice_webhook (
data : twilio.CallWebhookInput = Depends(twilio.CallWebhookInput.as_form),
):
call_id = str (uuid.uuid4())
async def prepare_call ():
agent = await create_agent()
phone_user = User(
name = f "Call from { data.from_number } " ,
id = f "phone- { data.from_number } "
)
stream_call = await agent.create_call( "default" , call_id = call_id)
return agent, phone_user, stream_call
twilio_call = call_registry.create(call_id, data, prepare = prepare_call)
url = f "wss:// { os.environ[ 'NGROK_URL' ] } /twilio/media/ { call_id } / { twilio_call.token } "
return twilio.create_media_stream_response(url)
@app.websocket ( "/twilio/media/ {call_id} / {token} " )
async def media_stream ( websocket : WebSocket, call_id : str , token : str ):
twilio_call = call_registry.validate(call_id, token)
twilio_stream = twilio.TwilioMediaStream(websocket)
await twilio_stream.accept()
agent, phone_user, stream_call = await twilio_call.await_prepare()
await twilio.attach_phone_to_call(stream_call, twilio_stream, phone_user.id)
async with agent.join(stream_call, participant_wait_timeout = 0 ):
await agent.llm.simple_response(
"Greet the caller and ask how you can help. Use your knowledge base."
)
await twilio_stream.run()
if __name__ == "__main__" :
asyncio.run(create_rag_from_directory())
uvicorn.run(app, host = "localhost" , port = 8000 )
Custom RAG Implementation
Implement custom RAG logic by extending the RAG base class:
from vision_agents.core.rag import RAG , Document
from pathlib import Path
class CustomRAG ( RAG ):
def __init__ ( self , namespace : str ):
self ._namespace = namespace
self ._indexed_files: list[ str ] = []
async def add_documents ( self , documents : list[Document]) -> int :
"""
Add documents to the index.
Args:
documents: List of Document(text, source)
Returns:
Number of chunks indexed
"""
# Your indexing logic here
for doc in documents:
# Chunk, embed, store
self ._indexed_files.append(doc.source)
return len (documents)
async def search ( self , query : str , top_k : int = 3 ) -> str :
"""
Search the knowledge base.
Args:
query: Search query
top_k: Number of results
Returns:
Formatted string with search results
"""
# Your search logic here
# Return formatted string for LLM context
return " \n\n " .join([
f "[ { i } ] From { source } : \n { text } "
for i, (source, text) in enumerate (results, 1 )
])
async def close ( self ) -> None :
# Cleanup
pass
Document Structure
Organize your knowledge base:
knowledge/
├── products/
│ ├── chat-api.md
│ ├── video-api.md
│ └── feeds-api.md
├── guides/
│ ├── getting-started.md
│ └── authentication.md
└── faq.md
Documents are automatically chunked based on the RAG configuration:
rag = turbopuffer.TurboPufferRAG(
namespace = "knowledge" ,
chunk_size = 10000 , # Characters per chunk
chunk_overlap = 200 , # Overlap for context continuity
)
Cache Warming
Warm the cache after indexing for low-latency queries:
rag = await turbopuffer.create_rag(
namespace = "knowledge" ,
knowledge_dir = "./knowledge" ,
)
# Cache is automatically warmed after indexing
# Or manually:
await rag.warm_cache()
Reciprocal Rank Fusion
TurboPuffer uses RRF to combine vector and BM25 results:
def reciprocal_rank_fusion (
ranked_lists : list[list[tuple[ str , float ]]],
k : int = 60 ,
) -> list[tuple[ str , float ]]:
"""
Combine multiple ranked lists using RRF.
Args:
ranked_lists: List of ranked results [(id, score), ...]
k: RRF constant (default 60, as per original paper)
Returns:
Fused ranking [(id, rrf_score), ...] sorted descending
"""
rrf_scores = defaultdict( float )
for ranked_list in ranked_lists:
for rank, (doc_id, _) in enumerate (ranked_list, start = 1 ):
rrf_scores[doc_id] += 1.0 / (k + rank)
return sorted (rrf_scores.items(), key = lambda x : x[ 1 ], reverse = True )
Production Best Practices
Choose the Right Backend
Gemini File Search : Easiest, fully managed, good for most use cases
TurboPuffer : More control, hybrid search, better for complex queries
Custom : Maximum flexibility, integrate your own vector DB
Optimize Chunking
rag = turbopuffer.TurboPufferRAG(
chunk_size = 10000 , # Larger chunks = more context per result
chunk_overlap = 200 , # Overlap prevents context loss at boundaries
)
Use Hybrid Search
Hybrid search (default) provides better results than vector-only: # Automatically uses hybrid search
results = await rag.search(query, top_k = 3 )
Monitor Retrieval Quality
@llm.register_function ( description = "Search knowledge base" )
async def search_knowledge ( query : str ) -> str :
results = await rag.search(query, top_k = 3 )
logger.info( f "RAG query: { query } " )
return results
Environment Variables
# TurboPuffer
TURBO_PUFFER_KEY = your_api_key
# Google (for Gemini embeddings or File Search)
GOOGLE_API_KEY = your_api_key
Complete Example
See examples/03_phone_and_rag_example/inbound_phone_and_rag_example.py for a complete working example.
Next Steps