Customizing Intent Maps
TheQueryDecomposer uses regex-based intent detection. Customize it for your domain:
Adding New Intents
components.py:9-22
class QueryDecomposer:
def _extract_intent(self, query: str) -> str:
intent_map = [
("penalty", r"\blate fee\b|\boverdue\b|\bpenalt(y|ies)\b"),
("payment_terms", r"\bpay\b|\binvoice\b|\bpayment\b"),
("intellectual_property", r"\bowned by\b|\blicense\b|\bintellectual property\b|\bIP\b|\binfring"),
("indemnification", r"\bindemnif(y|ication)\b|\bthird-party claims\b"),
("termination", r"\bterminate\b|\btermination\b|\bwritten notice\b"),
("confidentiality", r"\bconfidential\b|\bproprietary information\b"),
("scope_of_services", r"\bshall provide\b|\bservices\b"),
]
for intent, pattern in intent_map:
if re.search(pattern, query, re.IGNORECASE):
return intent
return "unknown"
class CustomQueryDecomposer(QueryDecomposer):
def _extract_intent(self, query: str) -> str:
intent_map = [
("warranty", r"\bwarranty\b|\bwarrant\b|\bguarantee\b"),
("penalty", r"\blate fee\b|\boverdue\b|\bpenalt(y|ies)\b"),
# ... rest of intents
]
for intent, pattern in intent_map:
if re.search(pattern, query, re.IGNORECASE):
return intent
return "unknown"
Place more specific intents first in the list. The first match wins.
Using Custom Decomposer
Replace the default decomposer in your workflow:from nodes import decompose_node
async def custom_decompose_node(state: DocMindState) -> DocMindState:
decomposer = CustomQueryDecomposer() # Use custom decomposer
decomposition = await decomposer.decompose(state["query"])
state["decomposition"] = decomposition
state["node_history"] = state.get("node_history", []) + ["decompose"]
return state
Customizing Section Retrieval
Adjusting Intent-Section Mapping
TheAgenticRetriever maps intents to target sections:
components.py:67-75
class AgenticRetriever:
INTENT_SECTION_MAP = {
"penalty": ["Late Payment Penalties", "Payment Terms"],
"payment_terms": ["Payment Terms", "Late Payment Penalties"],
"intellectual_property": ["Intellectual Property Rights"],
"indemnification": ["Indemnification"],
"termination": ["Termination for Convenience"],
"confidentiality": ["Confidentiality"],
"scope_of_services": ["Scope of Services"],
}
class CustomAgenticRetriever(AgenticRetriever):
INTENT_SECTION_MAP = {
"warranty": ["Warranties and Representations", "Disclaimer of Warranties"],
"penalty": ["Late Payment Penalties", "Payment Terms"],
# ... rest of mappings
}
Customizing Scoring Logic
The_score_section method determines relevance:
components.py:80-107
def _score_section(self, section: Dict, query: str, decomposition: Dict) -> float:
score = 0.0
content_lower = section["content"].lower()
title_lower = section["title"].lower()
# intent-based scoring (highest priority)
intent = decomposition.get("intent", "unknown")
target_sections = self.INTENT_SECTION_MAP.get(intent, [])
if section["title"] in target_sections:
score += 5.0
if section["title"] == target_sections[0]: # Primary match
score += 2.0
# entity matching
entities = decomposition.get("entities", [])
for entity in entities:
if entity in content_lower:
score += 1.0
if entity in title_lower:
score += 1.5
# query term matching
query_terms = query.lower().split()
for term in query_terms:
if len(term) > 3 and term in content_lower:
score += 0.5
return score
class RecencyAwareRetriever(AgenticRetriever):
def _score_section(self, section: Dict, query: str, decomposition: Dict) -> float:
score = super()._score_section(section, query, decomposition)
# Boost newer sections
if "last_updated" in section:
import datetime
days_old = (datetime.datetime.now() - section["last_updated"]).days
recency_boost = max(0, 2.0 - (days_old / 365)) # 2.0 boost decays over 1 year
score += recency_boost
return score
Adjusting Relevance Threshold
Change the minimum relevance score:components.py:135
relevant = self._filter_irrelevant(scored_sections, threshold=2.0)
class BroadRetriever(AgenticRetriever):
async def retrieve(self, query: str, decomposition: Dict) -> List[Dict]:
# ... scoring logic ...
relevant = self._filter_irrelevant(scored_sections, threshold=1.5) # Lower threshold
if not relevant:
relevant = scored_sections[:1]
else:
relevant = relevant[:7] # Return up to 7 sections instead of 5
return relevant
Customizing Judge Behavior
Adjusting Confidence Scoring
The judge uses weighted penalties for different claim types:components.py:282-291
# weighted scoring: contradictions are severe, unsupported is moderate
confidence_score = 1.0
confidence_score -= (contradicted_count / total_claims) * 0.8 # Heavy penalty
confidence_score -= (unsupported_count / total_claims) * 0.3 # Moderate penalty
confidence_score = max(0.0, min(1.0, confidence_score))
# determine if hallucinated
is_hallucinated = contradicted_count > 0 or confidence_score < 0.5
class LenientJudge(LLMJudge):
async def evaluate(self, response: str, context: List[Dict]) -> Dict:
# ... claim extraction and evaluation ...
confidence_score = 1.0
confidence_score -= (contradicted_count / total_claims) * 0.6 # Lighter penalty
confidence_score -= (unsupported_count / total_claims) * 0.2 # Lighter penalty
confidence_score = max(0.0, min(1.0, confidence_score))
# Higher threshold for hallucination flag
is_hallucinated = contradicted_count > 1 or confidence_score < 0.3
return {
"confidence_score": round(confidence_score, 2),
"is_hallucinated": is_hallucinated,
"should_return": not is_hallucinated,
# ... rest of verdict
}
Adding Custom Claim Types
Extend claim extraction with domain-specific patterns:class CustomJudge(LLMJudge):
def _extract_claims(self, response: str) -> List[Dict]:
claims = super()._extract_claims(response)
# Add custom claim type for warranty statements
warranty_patterns = re.findall(
r'([^.]*(?:warranty|warranted|guarantee)[^.]*\.)',
response,
re.IGNORECASE
)
for match in warranty_patterns:
if match.strip() not in [c["text"] for c in claims]:
claims.append({
"text": match.strip(),
"type": "warranty"
})
return claims
Customizing Response Generation
Changing Response Format
TheResponseGenerator controls output formatting:
components.py:312-329
class ResponseGenerator:
def generate(self, sections: List[Dict]) -> str:
response_parts = []
for section in sections:
title = section.get("title", "Unknown")
page = section.get("page_num", "?")
content = section.get("content", "")
sentences = re.split(r'(?<=[.!?])\s+', content)
key_info = sentences[0] if sentences else content[:200]
response_parts.append(f"{key_info} (See {title}, page {page})")
return " ".join(response_parts)
import json
class JSONResponseGenerator(ResponseGenerator):
def generate(self, sections: List[Dict]) -> str:
if not sections:
return json.dumps({"error": "No relevant information found"})
response = {
"answer": [],
"sources": []
}
for section in sections:
sentences = re.split(r'(?<=[.!?])\s+', section.get("content", ""))
key_info = sentences[0] if sentences else section.get("content", "")[:200]
response["answer"].append(key_info)
response["sources"].append({
"title": section.get("title", "Unknown"),
"page": section.get("page_num", "?"),
"section_id": section.get("section_id", "")
})
return json.dumps(response, indent=2)
class MarkdownResponseGenerator(ResponseGenerator):
def generate(self, sections: List[Dict]) -> str:
if not sections:
return "**No relevant information found.**"
response_parts = ["## Answer\n"]
for i, section in enumerate(sections, 1):
title = section.get("title", "Unknown")
page = section.get("page_num", "?")
content = section.get("content", "")
sentences = re.split(r'(?<=[.!?])\s+', content)
key_info = sentences[0] if sentences else content[:200]
response_parts.append(f"{i}. {key_info}\n *Source: {title}, page {page}*\n")
return "\n".join(response_parts)
Customizing Workflow
Modifying Retry Logic
Theshould_retry function controls retry behavior:
nodes.py:47-55
def should_retry(state: DocMindState) -> str:
verdict = state.get("judge_verdict", {})
retry_count = state.get("retry_count", 0)
# retry if hallucinated and haven't exceeded max retries (2 attempts max)
if verdict.get("is_hallucinated", False) and retry_count < 2:
log_retry_attempt(retry_count + 1, 2)
return "retry"
return "output"
def should_retry_extended(state: DocMindState) -> str:
verdict = state.get("judge_verdict", {})
retry_count = state.get("retry_count", 0)
# Allow up to 4 retries
if verdict.get("is_hallucinated", False) and retry_count < 4:
log_retry_attempt(retry_count + 1, 4)
return "retry"
return "output"
def confidence_based_retry(state: DocMindState) -> str:
verdict = state.get("judge_verdict", {})
retry_count = state.get("retry_count", 0)
confidence = verdict.get("confidence_score", 1.0)
# Retry if confidence is below 0.7 and we haven't exceeded max retries
if confidence < 0.7 and retry_count < 3:
log_retry_attempt(retry_count + 1, 3)
return "retry"
return "output"
Adding Custom Nodes
Extend the workflow with new nodes:from langgraph.graph import StateGraph, END
from state_types import DocMindState
async def summarize_node(state: DocMindState) -> DocMindState:
"""Summarize retrieved sections before generation."""
sections = state["retrieved_sections"]
# Custom summarization logic
summaries = []
for section in sections:
content = section.get("content", "")
summary = content[:100] + "..." # Simple truncation
summaries.append({**section, "summary": summary})
state["retrieved_sections"] = summaries
state["node_history"] = state.get("node_history", []) + ["summarize"]
return state
def build_custom_workflow() -> StateGraph:
workflow = StateGraph(DocMindState)
workflow.add_node("decompose", decompose_node)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("summarize", summarize_node) # New node
workflow.add_node("generate", generate_node)
workflow.add_node("judge", judge_node)
workflow.add_node("output", output_node)
workflow.set_entry_point("decompose")
workflow.add_edge("decompose", "retrieve")
workflow.add_edge("retrieve", "summarize") # Add summarization step
workflow.add_edge("summarize", "generate")
workflow.add_edge("generate", "judge")
workflow.add_conditional_edges(
"judge",
should_retry,
{
"retry": "retrieve",
"output": "output"
}
)
workflow.add_edge("output", END)
return workflow.compile()
Using Custom Components
Create a custom runner that uses your components:custom_runner.py
import asyncio
from state_types import DocMindState
# Import custom components
from custom_components import (
CustomQueryDecomposer,
RecencyAwareRetriever,
LenientJudge,
MarkdownResponseGenerator
)
from mock_data import MockDocumentStore
from workflow import build_custom_workflow
async def run_custom_docmind(query: str) -> str:
initial_state: DocMindState = {
"query": query,
"decomposition": None,
"retrieved_sections": [],
"generated_response": None,
"judge_verdict": None,
"final_output": None,
"retry_count": 0,
"node_history": []
}
graph = build_custom_workflow() # Use custom workflow
final_state = await graph.ainvoke(initial_state)
return final_state["final_output"]
if __name__ == "__main__":
async def main():
result = await run_custom_docmind("What are the warranty terms?")
print(result)
asyncio.run(main())
Configuration File Approach
For easier maintenance, use configuration files:config.py
from typing import Dict, List, Tuple
import re
class DocMindConfig:
# Intent patterns
INTENT_PATTERNS: List[Tuple[str, str]] = [
("warranty", r"\bwarranty\b|\bwarrant\b|\bguarantee\b"),
("penalty", r"\blate fee\b|\boverdue\b|\bpenalt(y|ies)\b"),
("payment_terms", r"\bpay\b|\binvoice\b|\bpayment\b"),
]
# Intent to section mapping
INTENT_SECTION_MAP: Dict[str, List[str]] = {
"warranty": ["Warranties and Representations"],
"penalty": ["Late Payment Penalties", "Payment Terms"],
"payment_terms": ["Payment Terms", "Late Payment Penalties"],
}
# Scoring weights
SCORE_INTENT_MATCH = 5.0
SCORE_PRIMARY_MATCH = 2.0
SCORE_ENTITY_IN_CONTENT = 1.0
SCORE_ENTITY_IN_TITLE = 1.5
SCORE_TERM_MATCH = 0.5
# Retrieval settings
RELEVANCE_THRESHOLD = 2.0
MAX_SECTIONS = 5
# Judge settings
CONTRADICTION_PENALTY = 0.8
UNSUPPORTED_PENALTY = 0.3
HALLUCINATION_THRESHOLD = 0.5
# Retry settings
MAX_RETRIES = 2
from config import DocMindConfig
class ConfigurableRetriever(AgenticRetriever):
def __init__(self, doc_store, config: DocMindConfig):
super().__init__(doc_store)
self.config = config
self.INTENT_SECTION_MAP = config.INTENT_SECTION_MAP
def _score_section(self, section: Dict, query: str, decomposition: Dict) -> float:
score = 0.0
# Use config values
intent = decomposition.get("intent", "unknown")
target_sections = self.INTENT_SECTION_MAP.get(intent, [])
if section["title"] in target_sections:
score += self.config.SCORE_INTENT_MATCH
if section["title"] == target_sections[0]:
score += self.config.SCORE_PRIMARY_MATCH
# ... rest of scoring using config values
return score
Testing Custom Components
Always test your customizations:test_custom.py
import pytest
from custom_components import CustomQueryDecomposer, RecencyAwareRetriever
from mock_data import MockDocumentStore
@pytest.mark.asyncio
async def test_custom_intent_detection():
decomposer = CustomQueryDecomposer()
result = await decomposer.decompose("What warranties are included?")
assert result["intent"] == "warranty", "Should detect warranty intent"
@pytest.mark.asyncio
async def test_recency_aware_scoring():
store = MockDocumentStore()
retriever = RecencyAwareRetriever(store)
results = await retriever.retrieve(
"What are the payment terms?",
{"intent": "payment_terms"}
)
assert len(results) > 0, "Should retrieve sections"
assert results[0]["_relevance_score"] > 0, "Should have positive score"
Next Steps
Installation
Set up DocMind with all dependencies
Testing
Run the comprehensive test suite