Skip to main content
This guide walks through implementing a new RAG architecture in the benchmark, from design to evaluation.

Overview

The benchmark currently includes six RAG architectures:
  • Simple Semantic RAG: Direct vector similarity matching
  • Hybrid RAG: BM25 + Semantic ensemble retrieval
  • Hybrid RAG + RRF: Reciprocal Rank Fusion
  • HyDE RAG: Hypothetical document embeddings
  • Query Rewriter RAG: Multi-query reformulation
  • PageIndex RAG: Page-aware retrieval
You can add new architectures by following the established patterns.

Implementation Requirements

Required Function Signature

Every RAG implementation must provide a query_for_evaluation() function:
def query_for_evaluation(
    question: str,
    llm_model: str = None,
    custom_llm: Optional[BaseChatModel] = None
) -> dict:
    """
    Process a question and return results for evaluation.
    
    Args:
        question: The question to process
        llm_model: Model name string (e.g., "gpt-4o")
        custom_llm: Pre-configured language model instance
        
    Returns:
        dict: {
            "question": str,
            "answer": str,
            "contexts": List[str],
            "metadata": dict
        }
    """

Return Dictionary Structure

The function must return:
{
    "question": "The input question",
    "answer": "Generated answer text",
    "contexts": ["context1", "context2", ...],  # Retrieved text chunks
    "source_documents": [Document, ...],         # Optional: full Document objects
    "metadata": {
        "num_contexts": int,
        "retrieval_method": str,
        "llm_model": str,
        "provider": str,
        "model_id": str,
        "execution_time": float,
        "input_tokens": int,
        "output_tokens": int,
        "total_cost": float,
        "tokens_used": int,
        "usage_source": str,
        "cost_source": str
    }
}

Step-by-Step Implementation

1

Create New RAG Module

Create a new file in src/rag/ for your RAG implementation:
touch src/rag/my_rag.py
Start with imports and basic setup:
"""
My Novel RAG Strategy - Brief description of your approach.

This module implements [describe your strategy and what makes it unique].
"""

import os
import time
from pathlib import Path
from typing import List, Dict, Any, Optional

from dotenv import load_dotenv
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.language_models import BaseChatModel
from langchain_core.prompts import ChatPromptTemplate

from src.common.model_provider import get_model_identity
from src.common.usage_metrics import extract_usage_from_ai_message, extract_cost_from_ai_message
from src.common.pricing import resolve_total_cost

# Environment configuration
PROJECT_ROOT = Path(__file__).resolve().parents[2]
ENV_PATH = PROJECT_ROOT / ".env"
load_dotenv(dotenv_path=ENV_PATH)

if not os.getenv("OPENAI_API_KEY"):
    raise ValueError("OPENAI_API_KEY not found in .env file")
2

Configure Models and Vector Store

Set up the required components:
# Define paths
chroma_db_dir = PROJECT_ROOT / "data" / "embeddings" / "chroma_db"
collection_name = "guia_embarazo_parto"

# Configure models
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
llm = ChatOpenAI(model_name="gpt-4o", temperature=0)

# Load vector store
vectorstore = Chroma(
    persist_directory=str(chroma_db_dir),
    embedding_function=embeddings,
    collection_name=collection_name,
)

# Configure retriever (customize based on your strategy)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
3

Define Prompt Templates

Create prompts specific to your strategy:
# Example: QA prompt for answer generation
qa_template = """
You are a medical expert specializing in pregnancy and childbirth.
Your task is to analyze the provided medical context and answer 
the user's question accurately and concisely.

STRICT INSTRUCTIONS:
1. Base your answer exclusively on the MEDICAL CONTEXT section.
2. The context is ordered by relevance. Prioritize early documents.
3. Provide a direct and integrated answer in a single paragraph.
4. If insufficient information exists, state that clearly.
5. Always answer in Spanish.

MEDICAL CONTEXT (ordered by relevance):
{context}

QUESTION: {question}

DETAILED MEDICAL ANSWER:
"""
qa_prompt = ChatPromptTemplate.from_template(qa_template)
4

Implement Core Processing Function

Create the main processing function for your strategy:
def format_docs(docs: List[Document]) -> str:
    """Format retrieved documents for the prompt."""
    formatted_docs = []
    for i, doc in enumerate(docs):
        source = doc.metadata.get('source', 'N/A')
        page = doc.metadata.get('page_number', 'N/A')
        formatted_doc = f"""--- Document {i+1} ---
Source: {source}, Page: {page}
Content: {doc.page_content}"""
        formatted_docs.append(formatted_doc)
    return "\n\n".join(formatted_docs)


def process_my_rag_query(
    query: str,
    custom_llm: ChatOpenAI = None
) -> Dict[str, Any]:
    """
    Process a query using your novel RAG strategy.
    
    Args:
        query: The user's question
        custom_llm: Optional custom LLM to use
        
    Returns:
        Dictionary with answer, contexts, and metrics
    """
    # 1. Implement your retrieval strategy here
    # Example: standard retrieval
    retrieved_docs = retriever.invoke(query)
    
    # 2. Format context
    formatted_context = format_docs(retrieved_docs)
    
    # 3. Generate answer
    current_llm = custom_llm if custom_llm else llm
    response = current_llm.invoke(qa_prompt.format_messages(
        context=formatted_context,
        question=query
    ))
    
    # 4. Extract usage metrics
    usage = extract_usage_from_ai_message(response)
    provider_cost = extract_cost_from_ai_message(response)
    
    # 5. Return results
    return {
        'answer': response.content,
        'contexts': [doc.page_content for doc in retrieved_docs],
        'retrieved_documents': retrieved_docs,
        'metrics': {
            'input_tokens': int(usage['input_tokens']),
            'output_tokens': int(usage['output_tokens']),
            'total_tokens': int(usage['total_tokens']),
            'usage_source': str(usage['usage_source']),
            'cost': float(provider_cost['total_cost']) if provider_cost['total_cost'] is not None else 0.0,
            'cost_source': str(provider_cost['cost_source'])
        }
    }
5

Implement Evaluation Wrapper

Create the required query_for_evaluation() function:
def query_for_evaluation(
    question: str,
    llm_model: str = None,
    custom_llm: Optional[BaseChatModel] = None
) -> dict:
    """
    Wrapper function for RAG evaluation frameworks like RAGAS.
    
    This function ensures compatibility with the evaluation pipeline.
    
    Args:
        question: The question to process
        llm_model: Model name string
        custom_llm: Pre-configured language model
        
    Returns:
        Dictionary structured for evaluation
    """
    start_time = time.time()
    
    # Determine which LLM to use
    if custom_llm:
        result = process_my_rag_query(question, custom_llm)
        model_identity = get_model_identity(llm=custom_llm)
    elif llm_model:
        custom_llm_instance = ChatOpenAI(model_name=llm_model, temperature=0)
        result = process_my_rag_query(question, custom_llm_instance)
        model_identity = get_model_identity(model_name=llm_model, llm=custom_llm_instance)
    else:
        result = process_my_rag_query(question)
        model_identity = get_model_identity(model_name="gpt-4o", llm=llm)
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    # Resolve costs
    input_tokens = result["metrics"]["input_tokens"]
    output_tokens = result["metrics"]["output_tokens"]
    resolved_cost = resolve_total_cost(
        provider=model_identity["provider"],
        model_name=model_identity["model_name"],
        model_id=model_identity["model_id"],
        input_tokens=input_tokens,
        output_tokens=output_tokens,
        provider_reported_cost=result["metrics"]["cost"],
        provider_cost_source=result["metrics"]["cost_source"],
        execution_time_seconds=execution_time,
    )
    
    return {
        "question": question,
        "answer": result["answer"],
        "contexts": result["contexts"],
        "source_documents": result["retrieved_documents"],
        "metadata": {
            "num_contexts": len(result["contexts"]),
            "retrieval_method": "my_novel_strategy",
            "llm_model": model_identity["model_name"],
            "provider": model_identity["provider"],
            "model_id": model_identity["model_id"],
            "embedding_model": "text-embedding-3-small",
            "execution_time": execution_time,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "total_cost": resolved_cost["total_cost"],
            "tokens_used": input_tokens + output_tokens,
            "usage_source": result["metrics"]["usage_source"],
            "cost_source": resolved_cost["cost_source"],
        }
    }
6

Integrate with Evaluator

Register your RAG architecture in src/evaluation/ragas_evaluator.py:
# Add import at top of file
from src.rag.my_rag import query_for_evaluation as my_rag_query_for_evaluation

# In RAGASEvaluator.__init__() method, add new case:
elif rag_type.lower() == "my-rag":
    self.query_function = my_rag_query_for_evaluation
    self.rag_name = "My Novel RAG Strategy"
    self.rag_type = "my-rag"
    self.llm_model = "gpt-4o"
Also add evaluation helper function:
def evaluate_my_rag(export_analysis: bool = False, debug: bool = False):
    """Evaluate My Novel RAG specifically"""
    evaluator = RAGASEvaluator(rag_type="my-rag", debug=debug)
    results = evaluator.run_evaluation()
    
    if export_analysis:
        try:
            from src.common.utils import export_ragas_analysis
            performance_metadata = getattr(evaluator, 'performance_metadata', None)
            export_files = export_ragas_analysis(
                results, "my_rag", performance_metadata=performance_metadata
            )
            print("\nDetailed analysis exported:")
            for file_type, file_path in export_files.items():
                print(f"  {file_type}: {file_path.name}")
        except Exception as e:
            print(f"Error exporting analysis: {e}")
    
    return results
7

Update CLI Script

Add your RAG to the evaluation script in scripts/run_evaluation.py:
# Add to argument parser help text
parser.add_argument(
    'rag_type',
    nargs='?',
    choices=['simple', 'hybrid', 'hyde', 'rewriter', 'my-rag', ...],
    help='RAG architecture to evaluate'
)

# Add evaluation case
elif args.rag_type == 'my-rag':
    from src.evaluation.ragas_evaluator import evaluate_my_rag
    evaluate_my_rag(export_analysis=args.export, debug=args.debug)

Testing Your Implementation

Unit Testing

Create a test file to verify basic functionality:
# tests/test_my_rag.py
import pytest
from src.rag.my_rag import query_for_evaluation

def test_query_for_evaluation():
    """Test basic query processing"""
    result = query_for_evaluation(
        "¿Cuál es la cantidad ideal de controles prenatales?"
    )
    
    assert "question" in result
    assert "answer" in result
    assert "contexts" in result
    assert "metadata" in result
    assert len(result["contexts"]) > 0
    assert result["metadata"]["retrieval_method"] == "my_novel_strategy"

Interactive Testing

Run your implementation directly:
python src/rag/my_rag.py

Evaluation Testing

Run a full evaluation:
python scripts/run_evaluation.py my-rag

Evaluation and Analysis

Single Model Evaluation

# Evaluate with default model
python scripts/run_evaluation.py my-rag

# With debug output
python scripts/run_evaluation.py my-rag --debug

Multi-Model Comparison

# Test across all models
python scripts/run_evaluation.py multi-model my-rag

Comprehensive Benchmark

# Compare against all existing RAG architectures
python scripts/run_evaluation.py all-models-all-rags

Performance Considerations

Optimization Tips

  1. Cache Embeddings: Reuse embeddings when possible
  2. Batch Processing: Process multiple queries together
  3. Async Operations: Use async/await for parallel API calls
  4. Connection Pooling: Reuse HTTP connections

Monitoring Costs

The framework automatically tracks:
  • Input/output tokens per query
  • Cost per query and total cost
  • Execution time
View costs in evaluation results:
{
  "metadata": {
    "total_cost": 0.0234,
    "average_cost_per_question": 0.00234
  }
}

Example: Semantic Reranking RAG

Here’s a complete example implementing semantic reranking:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank

# Initialize reranker
compressor = CohereRerank(model="rerank-english-v2.0", top_n=3)

# Create compression retriever
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=retriever
)

def process_rerank_query(query: str, custom_llm: ChatOpenAI = None):
    # Use compression retriever instead of base retriever
    retrieved_docs = compression_retriever.invoke(query)
    # ... rest of processing

Next Steps

Integrating Models

Test your RAG with different LLMs

Customizing Metrics

Add custom evaluation metrics

API Reference

Explore the complete API

Contributing

Contribute your implementation

Build docs developers (and LLMs) love