Skip to main content
Retrieval Augmented Generation (RAG) enhances agents with external knowledge by retrieving relevant documents before generating responses.

Quick Start

The simplest way to add RAG is using Gemini’s built-in File Search:
from pathlib import Path
from vision_agents.core import Agent, User
from vision_agents.plugins import gemini, getstream, deepgram, elevenlabs

# Create file search store
file_search_store = await gemini.create_file_search_store(
    name="product-knowledge",
    knowledge_dir=Path("./knowledge"),
    extensions=[".md", ".txt"],
)

# Create LLM with File Search tool
llm = gemini.LLM(
    "gemini-2.5-flash-lite",
    tools=[gemini.tools.FileSearch(file_search_store)],
)

agent = Agent(
    edge=getstream.Edge(),
    agent_user=User(id="ai-agent", name="AI"),
    instructions="Use your knowledge base to answer questions about our products.",
    llm=llm,
    tts=elevenlabs.TTS(),
    stt=deepgram.STT(eager_turn_detection=True),
)
Gemini File Search is the easiest option - it handles everything automatically including chunking, embedding, and retrieval.
For more control and better retrieval quality, use TurboPuffer with hybrid search (vector + BM25):
from pathlib import Path
from vision_agents.plugins import turbopuffer, gemini

# Create RAG with hybrid search
rag = await turbopuffer.create_rag(
    namespace="product-knowledge",
    knowledge_dir=Path("./knowledge"),
    extensions=[".md"],
    region="gcp-us-central1",
)

# Register as a function
llm = gemini.LLM("gemini-2.5-flash-lite")

@llm.register_function(
    description="Search knowledge base for detailed product information"
)
async def search_knowledge(query: str) -> str:
    return await rag.search(query, top_k=3)

agent = Agent(
    llm=llm,
    ...
)

Hybrid Search Modes

TurboPuffer supports three search modes:
# Combines vector + BM25 using Reciprocal Rank Fusion
results = await rag.search(
    "How does chat API work?",
    top_k=3,
    mode="hybrid"  # default
)
Hybrid search combines the best of both:
  • Vector search: Semantic understanding (“how to send messages” matches “message sending guide”)
  • BM25 search: Exact keyword matching (“SDK-123” matches exactly “SDK-123”)
Results are merged using Reciprocal Rank Fusion (RRF) for superior retrieval quality.

Phone Agent with RAG

Complete example of a phone agent with knowledge retrieval:
import asyncio
import os
from pathlib import Path
import uvicorn
from fastapi import FastAPI, WebSocket, Depends
from vision_agents.core import User, Agent
from vision_agents.plugins import (
    getstream,
    gemini,
    twilio,
    elevenlabs,
    deepgram,
    turbopuffer,
)

app = FastAPI()
call_registry = twilio.TwilioCallRegistry()

KNOWLEDGE_DIR = Path(__file__).parent / "knowledge"
rag = None  # Initialized at startup

async def create_rag_from_directory():
    """Initialize RAG with knowledge files."""
    global rag
    rag = await turbopuffer.create_rag(
        namespace="product-knowledge",
        knowledge_dir=KNOWLEDGE_DIR,
        extensions=[".md"],
    )
    print(f"RAG ready with {len(rag.indexed_files)} documents")

async def create_agent() -> Agent:
    llm = gemini.LLM("gemini-2.5-flash-lite")
    
    @llm.register_function(
        description="Search product knowledge base for detailed information"
    )
    async def search_knowledge(query: str) -> str:
        return await rag.search(query, top_k=3)
    
    return Agent(
        edge=getstream.Edge(),
        agent_user=User(id="ai-agent", name="AI"),
        instructions="Answer questions using your knowledge base.",
        tts=elevenlabs.TTS(),
        stt=deepgram.STT(eager_turn_detection=True),
        llm=llm,
    )

@app.post("/twilio/voice")
async def twilio_voice_webhook(
    data: twilio.CallWebhookInput = Depends(twilio.CallWebhookInput.as_form),
):
    call_id = str(uuid.uuid4())
    
    async def prepare_call():
        agent = await create_agent()
        phone_user = User(
            name=f"Call from {data.from_number}",
            id=f"phone-{data.from_number}"
        )
        stream_call = await agent.create_call("default", call_id=call_id)
        return agent, phone_user, stream_call
    
    twilio_call = call_registry.create(call_id, data, prepare=prepare_call)
    url = f"wss://{os.environ['NGROK_URL']}/twilio/media/{call_id}/{twilio_call.token}"
    return twilio.create_media_stream_response(url)

@app.websocket("/twilio/media/{call_id}/{token}")
async def media_stream(websocket: WebSocket, call_id: str, token: str):
    twilio_call = call_registry.validate(call_id, token)
    twilio_stream = twilio.TwilioMediaStream(websocket)
    await twilio_stream.accept()
    
    agent, phone_user, stream_call = await twilio_call.await_prepare()
    await twilio.attach_phone_to_call(stream_call, twilio_stream, phone_user.id)
    
    async with agent.join(stream_call, participant_wait_timeout=0):
        await agent.llm.simple_response(
            "Greet the caller and ask how you can help. Use your knowledge base."
        )
        await twilio_stream.run()

if __name__ == "__main__":
    asyncio.run(create_rag_from_directory())
    uvicorn.run(app, host="localhost", port=8000)

Custom RAG Implementation

Implement custom RAG logic by extending the RAG base class:
from vision_agents.core.rag import RAG, Document
from pathlib import Path

class CustomRAG(RAG):
    def __init__(self, namespace: str):
        self._namespace = namespace
        self._indexed_files: list[str] = []
    
    async def add_documents(self, documents: list[Document]) -> int:
        """
        Add documents to the index.
        
        Args:
            documents: List of Document(text, source)
        
        Returns:
            Number of chunks indexed
        """
        # Your indexing logic here
        for doc in documents:
            # Chunk, embed, store
            self._indexed_files.append(doc.source)
        
        return len(documents)
    
    async def search(self, query: str, top_k: int = 3) -> str:
        """
        Search the knowledge base.
        
        Args:
            query: Search query
            top_k: Number of results
        
        Returns:
            Formatted string with search results
        """
        # Your search logic here
        # Return formatted string for LLM context
        return "\n\n".join([
            f"[{i}] From {source}:\n{text}"
            for i, (source, text) in enumerate(results, 1)
        ])
    
    async def close(self) -> None:
        # Cleanup
        pass

Document Structure

Organize your knowledge base:
knowledge/
├── products/
│   ├── chat-api.md
│   ├── video-api.md
│   └── feeds-api.md
├── guides/
│   ├── getting-started.md
│   └── authentication.md
└── faq.md
Documents are automatically chunked based on the RAG configuration:
rag = turbopuffer.TurboPufferRAG(
    namespace="knowledge",
    chunk_size=10000,      # Characters per chunk
    chunk_overlap=200,     # Overlap for context continuity
)

Cache Warming

Warm the cache after indexing for low-latency queries:
rag = await turbopuffer.create_rag(
    namespace="knowledge",
    knowledge_dir="./knowledge",
)

# Cache is automatically warmed after indexing
# Or manually:
await rag.warm_cache()

Reciprocal Rank Fusion

TurboPuffer uses RRF to combine vector and BM25 results:
def reciprocal_rank_fusion(
    ranked_lists: list[list[tuple[str, float]]],
    k: int = 60,
) -> list[tuple[str, float]]:
    """
    Combine multiple ranked lists using RRF.
    
    Args:
        ranked_lists: List of ranked results [(id, score), ...]
        k: RRF constant (default 60, as per original paper)
    
    Returns:
        Fused ranking [(id, rrf_score), ...] sorted descending
    """
    rrf_scores = defaultdict(float)
    
    for ranked_list in ranked_lists:
        for rank, (doc_id, _) in enumerate(ranked_list, start=1):
            rrf_scores[doc_id] += 1.0 / (k + rank)
    
    return sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True)

Production Best Practices

1

Choose the Right Backend

  • Gemini File Search: Easiest, fully managed, good for most use cases
  • TurboPuffer: More control, hybrid search, better for complex queries
  • Custom: Maximum flexibility, integrate your own vector DB
2

Optimize Chunking

rag = turbopuffer.TurboPufferRAG(
    chunk_size=10000,    # Larger chunks = more context per result
    chunk_overlap=200,   # Overlap prevents context loss at boundaries
)
3

Use Hybrid Search

Hybrid search (default) provides better results than vector-only:
# Automatically uses hybrid search
results = await rag.search(query, top_k=3)
4

Monitor Retrieval Quality

@llm.register_function(description="Search knowledge base")
async def search_knowledge(query: str) -> str:
    results = await rag.search(query, top_k=3)
    logger.info(f"RAG query: {query}")
    return results

Environment Variables

# TurboPuffer
TURBO_PUFFER_KEY=your_api_key

# Google (for Gemini embeddings or File Search)
GOOGLE_API_KEY=your_api_key

Complete Example

See examples/03_phone_and_rag_example/inbound_phone_and_rag_example.py for a complete working example.

Next Steps

Build docs developers (and LLMs) love