Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/avnlp/dspy-opt/llms.txt

Use this file to discover all available pages before exploring further.

DSPy-Opt organizes retrieval-augmented generation as a deterministic five-stage pipeline. Each stage is a composable DSPy module with learnable prompts, meaning optimizers can tune stages 1, 2, 3, and 5 automatically — improving retrieval quality and answer accuracy without manual prompt engineering. The pipeline accepts a plain-text question and returns a structured prediction containing the final answer, the chain-of-thought reasoning, the rewritten query, the generated sub-queries, and the full list of retrieved passages.

Pipeline Overview

The five stages execute sequentially inside FreshQARAG.forward(). Retrieval in stage 4 fans out across the main rewritten query and every sub-query, then collapses back into a single deduplicated passage list that feeds stage 5.
question


[1] QueryRewriter        → rewritten_query


[2] SubQueryGenerator    → [sub_query_1, sub_query_2, …]


[3] MetadataExtractor    → {title: …, category: …}  (per query)


[4] WeaviateRetriever    → passages[]  (main + sub-queries, aggregated)


[5] dspy.ChainOfThought  → answer + reasoning
1

Query Rewriting

The raw user question is passed to QueryRewriter, which uses dspy.ChainOfThought(QueryRewriteSignature) by default (switchable to dspy.Predict via use_chain_of_thought=False) to produce a search-optimised string. The signature instructs the model to expand the query with relevant synonyms and concepts, clarify ambiguous terms, remove conversational noise such as “I want” or “looking for”, preserve key entities and numerical constraints, and keep the result between 5 and 15 words.
class QueryRewriteSignature(dspy.Signature):
    """Rewrite user queries to optimize for information retrieval."""

    original_query = dspy.InputField(
        desc="User's original search query. Must be rewritten to improve search effectiveness "
        "without altering core intent. Focus on: "
        "- Expanding with relevant synonyms/concepts "
        "- Clarifying ambiguous terms "
        "- Removing noise words "
        "- Maintaining conciseness "
        "- Preserving key entities and constraints"
    )
    rewritten_query = dspy.OutputField(
        desc="Optimized query string ready for search engine. Must: "
        "- Be 5-15 words long "
        "- Contain only essential search terms "
        "- Exclude explanatory phrases like 'I want' or 'looking for' "
        "- Include expanded concepts where helpful "
        "- Preserve numerical constraints and key entities "
        "- Output ONLY the rewritten query string with no additional text"
    )


class QueryRewriter(dspy.Module):
    def __init__(self, use_chain_of_thought: bool = True):
        super().__init__()
        if use_chain_of_thought:
            self.rewriter = dspy.ChainOfThought(QueryRewriteSignature)
        else:
            self.rewriter = dspy.Predict(QueryRewriteSignature)

    def forward(self, query: str) -> dspy.Prediction:
        return self.rewriter(original_query=query)
Because QueryRewriter wraps a dspy.ChainOfThought module, DSPy optimizers can tune both its instruction text and its few-shot demonstrations. The rewritten query feeds every downstream stage.
2

Sub-Query Generation

The rewritten query is passed to SubQueryGenerator, which uses dspy.ChainOfThought(SubQuerySignature) to decompose multi-faceted questions into 2–5 focused sub-queries for parallel retrieval. The optimal number of sub-queries is determined automatically by _determine_complexity(), which counts comparative keywords (compare, vs, versus), conjunctions (and, &), query length, and punctuation. Each generated sub-query must be self-contained and 5–12 words long.
class SubQueryGenerator(dspy.Module):
    def __init__(self, min_subqueries: int = 2, max_subqueries: int = 5):
        super().__init__()
        self.min_subqueries = min(2, min_subqueries)
        self.max_subqueries = max(5, max_subqueries)
        self.generator = dspy.ChainOfThought(SubQuerySignature)

    def _determine_complexity(self, query: str) -> int:
        complexity = 1
        if any(word in query.lower() for word in ["compare", "versus", "vs", "difference"]):
            complexity += 1
        if any(word in query.lower() for word in ["and", "&", "also"]):
            complexity += 1
        if len(query.split()) > 10:
            complexity += 1
        if any(char in query for char in [":", ";", ","]):
            complexity += 1
        return min(self.max_subqueries, max(self.min_subqueries, complexity))

    def forward(self, query: str, num_subqueries: Optional[int] = None) -> dspy.Prediction:
        target_count = num_subqueries or self._determine_complexity(query)
        result = self.generator(
            original_query=query, num_subqueries=str(target_count)
        )
        sub_queries = json.loads(result.sub_queries)
        return dspy.Prediction(sub_queries=sub_queries, rationale=result.rationale)
If JSON parsing fails or the model returns fewer sub-queries than min_subqueries, SubQueryGenerator falls back to a single simplified rewrite of the original query, removing common stop words. This ensures retrieval always proceeds even when decomposition fails.
3

Metadata Extraction

MetadataExtractor calls a dedicated extractor_llm (separate from the answer LLM) via dspy.Predict(ExtractMetadataSignature) inside a dspy.context(lm=self.extractor_llm) block. It extracts structured fields defined in a user-provided JSON schema — for example title and category in the FreshQA config — and returns only the non-null fields as a plain Python dictionary. Extraction is run once for the main rewritten query and once for each sub-query, yielding per-query metadata dictionaries.The schema is validated before each call: every property must use one of the allowed types (string, number, boolean), and enum is restricted to string fields. Any extraction failure returns an empty dict {} so the pipeline degrades gracefully.
# Example metadata schema (from freshqa_rag_mipro_config.yml)
metadata_schema = {
    "properties": {
        "title": {
            "type": "string",
            "description": "The main title or name of the subject"
        },
        "category": {
            "type": "string",
            "description": "Primary category or type of content"
        }
    }
}

metadata_extractor = MetadataExtractor(extractor_llm=extractor_lm)
# Returns e.g. {"title": "Eiffel Tower", "category": "Landmark"}
metadata = metadata_extractor(rewritten_query, metadata_schema)
The extractor is intentionally instructed not to use placeholders like "Unknown" or "N/A". Only fields explicitly stated in the input text are populated, making the resulting filter predicates reliable rather than noisy.
4

Document Retrieval

WeaviateRetriever performs hybrid search — combining dense vector similarity with keyword-based BM25 — against a named Weaviate collection. It accepts an optional precomputed embedding vector and an optional metadata filter. The filter is built from the extracted metadata dictionary: only keys present in the metadata_schema passed to the retriever are translated into Weaviate Filter predicates via metadata_to_weaviate_filter().Retrieval is called once for the main rewritten query and once per sub-query. All passage lists are concatenated, then deduplicated with dict.fromkeys() to preserve insertion order:
all_passages = []

# Retrieve for the main rewritten query
main_retrieval = self.retriever(
    query=rewritten_query,
    query_embedding=self.embedding_model.encode(rewritten_query),
    top_k=self.top_k,
    metadata=rewritten_query_metadata,
)
all_passages.extend(main_retrieval.passages)

# Retrieve for each sub-query
for sub_query, sub_query_metadata in zip(sub_queries, sub_queries_metadata):
    sub_retrieval = self.retriever(
        query=sub_query,
        query_embedding=self.embedding_model.encode(sub_query),
        top_k=self.top_k,
        metadata=sub_query_metadata,
    )
    all_passages.extend(sub_retrieval.passages)

# Deduplicate while preserving order
unique_passages = list(dict.fromkeys(all_passages))
If no passages are retrieved, the pipeline substitutes ["No relevant context found in the knowledge base."] so that stage 5 always receives a non-empty context list.
5

Answer Generation

The deduplicated passage list and the original question are fed into dspy.ChainOfThought(FreshQAAnswerSignature). This produces four output fields: rewritten_query, sub_queries, answer, and reasoning. The reasoning field exposes the model’s chain-of-thought — how it synthesised the retrieved passages into a final answer.
class FreshQAAnswerSignature(dspy.Signature):
    """Signature for generating answers to FreshQA questions."""

    context = dspy.InputField(desc="List of relevant passages from the knowledge base")
    question = dspy.InputField(desc="The original question to be answered")
    rewritten_query = dspy.OutputField(desc="Rewritten query to optimize for retrieval")
    sub_queries = dspy.OutputField(
        desc="List of sub-queries generated for complex questions"
    )
    answer = dspy.OutputField(desc="Concise and accurate answer to the question")
    reasoning = dspy.OutputField(desc="Brief explanation of how the answer was derived")
The pipeline’s forward() method wraps stage 5’s output in a dspy.Prediction that exposes all intermediate state — question, rewritten_query, sub_queries, retrieved_context, answer, and reasoning — so that downstream evaluation and optimization have full visibility into how the answer was produced.

Complete forward() Method

The full FreshQARAG.forward() method ties all five stages together. A top-level try/except provides a fallback path if any stage raises an unhandled exception: the pipeline generates an answer directly from "Limited context available" rather than crashing.
class FreshQARAG(dspy.Module):
    """Complete FreshQA RAG pipeline using DSPy framework."""

    def __init__(
        self,
        query_rewriter: QueryRewriter,
        sub_query_generator: SubQueryGenerator,
        metadata_extractor: MetadataExtractor,
        metadata_schema: Dict[str, Any],
        weaviate_retriever: WeaviateRetriever,
        embedding_model: SentenceTransformer,
        top_k: int = 3,
    ):
        super().__init__()
        self.query_rewriter = query_rewriter
        self.sub_query_generator = sub_query_generator
        self.metadata_extractor = metadata_extractor
        self.metadata_schema = metadata_schema
        self.retriever = weaviate_retriever
        self.embedding_model = embedding_model
        self.top_k = top_k
        self.generate_answer = dspy.ChainOfThought(FreshQAAnswerSignature)

    def forward(self, question: str) -> dspy.Prediction:
        """Execute the complete RAG pipeline."""
        # Stage 1: Query rewriting
        rewritten_query = self.query_rewriter(question).rewritten_query

        # Stage 2: Sub-query decomposition
        sub_queries = self.sub_query_generator(rewritten_query).sub_queries

        # Stage 3: Metadata extraction (main query + each sub-query)
        rewritten_query_metadata = self.metadata_extractor(
            rewritten_query, self.metadata_schema
        )
        sub_queries_metadata = [
            self.metadata_extractor(sub_query, self.metadata_schema)
            for sub_query in sub_queries
        ]

        # Stage 4: Hybrid retrieval (main query + sub-queries)
        all_passages = []
        main_retrieval = self.retriever(
            query=rewritten_query,
            query_embedding=self.embedding_model.encode(rewritten_query),
            top_k=self.top_k,
            metadata=rewritten_query_metadata,
        )
        all_passages.extend(main_retrieval.passages)
        for sub_query, sub_query_metadata in zip(sub_queries, sub_queries_metadata):
            sub_retrieval = self.retriever(
                query=sub_query,
                query_embedding=self.embedding_model.encode(sub_query),
                top_k=self.top_k,
                metadata=sub_query_metadata,
            )
            all_passages.extend(sub_retrieval.passages)

        unique_passages = list(dict.fromkeys(all_passages))
        if not unique_passages:
            unique_passages = ["No relevant context found in the knowledge base."]

        # Stage 5: Chain-of-thought answer generation
        answer_result = self.generate_answer(
            context=unique_passages, question=question
        )

        return dspy.Prediction(
            question=question,
            rewritten_query=rewritten_query,
            sub_queries=sub_queries,
            retrieved_context=unique_passages,
            answer=answer_result.answer,
            reasoning=answer_result.reasoning,
        )

Which Stages Are Optimizable

Stages 1, 2, 3, and 5 each contain DSPy modules with learnable prompts:
StageModuleDSPy ModuleOptimizable
1QueryRewriterdspy.ChainOfThought(QueryRewriteSignature)✅ Yes
2SubQueryGeneratordspy.ChainOfThought(SubQuerySignature)✅ Yes
3MetadataExtractordspy.Predict(ExtractMetadataSignature)✅ Yes
4WeaviateRetrieverDeterministic hybrid search❌ No
5generate_answerdspy.ChainOfThought(FreshQAAnswerSignature)✅ Yes
Stage 4 (WeaviateRetriever) is a deterministic database call with no learnable parameters. All other stages expose their instruction text and few-shot slots to the DSPy optimizer during compilation.

Build docs developers (and LLMs) love