Documentation Index Fetch the complete documentation index at: https://mintlify.com/avnlp/vectordb/llms.txt
Use this file to discover all available pages before exploring further.
This guide covers best practices, configuration patterns, and deployment strategies for running VectorDB pipelines in production.
Deployment checklist
Before deploying to production, ensure you have:
Environment configuration
Production environment variables
Use environment variables for all secrets and environment-specific settings:
# Vector Database Credentials
PINECONE_API_KEY = pc-prod-xxxx
WEAVIATE_URL = https://prod-cluster.weaviate.network
WEAVIATE_API_KEY = weaviate-prod-key
MILVUS_URI = https://prod-milvus.example.com:19530
MILVUS_TOKEN = milvus-prod-token
QDRANT_URL = https://qdrant-prod.example.com
QDRANT_API_KEY = qdrant-prod-key
# LLM API Keys
GROQ_API_KEY = gsk_prod_xxxx
OPENAI_API_KEY = sk-prod-xxxx
COHERE_API_KEY = cohere-prod-xxxx
# Deployment Settings
LOG_LEVEL = WARNING
ENABLE_TELEMETRY = true
MAX_RETRIES = 3
TIMEOUT_SECONDS = 30
See the environment variables reference for the complete list.
Configuration file structure
Organize configurations by environment:
configs/
├── base.yaml # Shared settings
├── development.yaml # Dev overrides
├── staging.yaml # Staging overrides
└── production.yaml # Production settings
Base configuration (configs/base.yaml):
embeddings :
model : "Qwen/Qwen3-Embedding-0.6B"
batch_size : 32
device : "cpu"
logging :
name : "vectordb_pipeline"
level : "INFO"
Production overrides (configs/production.yaml):
embeddings :
batch_size : 64 # Higher throughput in production
device : "cuda" # GPU acceleration
search :
top_k : 10
candidate_pool_size : 15 # Cost-optimized
rag :
enabled : true
model : "llama-3.3-70b-versatile"
api_key : "${GROQ_API_KEY}"
temperature : 0.7
max_tokens : 2048
logging :
level : "WARNING" # Reduce log volume
retry :
max_attempts : 3
backoff_factor : 2
Load environment-specific config:
import os
from vectordb.utils.config_loader import ConfigLoader
env = os.getenv( "ENVIRONMENT" , "development" )
config_path = f "configs/ { env } .yaml"
pipeline = PineconeSemanticSearchPipeline(config_path)
Database-specific deployment considerations
Namespace strategy: pinecone :
api_key : "${PINECONE_API_KEY}"
index_name : "production-index"
namespace : "v1" # Version namespaces for zero-downtime updates
dimension : 1024
metric : "cosine"
recreate : false # Never recreate in production
Multi-tenancy: Use namespaces for tenant isolation (scales to 100,000+ tenants): for tenant_id in tenant_ids:
result = pipeline.search(
query = user_query,
top_k = 10 ,
namespace = f "tenant- { tenant_id } "
)
Best practices:
Monitor pod utilization and scale replicas based on QPS
Use serverless indexes for variable workloads
Implement retry logic for rate limit errors (429)
Connection configuration: weaviate :
cluster_url : "${WEAVIATE_URL}"
api_key : "${WEAVIATE_API_KEY}"
timeout : 30
connection_pool_size : 10
Multi-tenancy: Weaviate supports native multi-tenancy with per-tenant shards: # Initialize with tenant support
pipeline = WeaviateSemanticSearchPipeline(
config_path,
tenant = "customer-123"
)
Best practices:
Use batch imports for initial indexing (100+ docs/batch)
Enable quantization (PQ or BQ) to reduce memory 4x
Monitor shard health and replication status
Production configuration: milvus :
uri : "${MILVUS_URI}"
token : "${MILVUS_TOKEN}"
collection_name : "production_collection"
dimension : 1024
recreate : false
batch_size : 100
Partition-based multi-tenancy: # Scales to millions of tenants using partition keys
config = {
"milvus" : {
"partition_key" : "tenant_id" ,
"num_partitions" : 1000
}
}
Best practices:
Use scalar quantization (SQ8) for 4x storage reduction
Enable partition pruning with metadata filters
Monitor memory usage per collection
Set appropriate index_file_size for write throughput
Production setup: qdrant :
url : "${QDRANT_URL}"
api_key : "${QDRANT_API_KEY}"
collection_name : "production_docs"
timeout : 30
prefer_grpc : true # Better performance
Payload-based multi-tenancy: # Use payload filters for tenant isolation
filters = {
"must" : [
{ "key" : "tenant_id" , "match" : { "value" : tenant_id}}
]
}
result = pipeline.search(
query = user_query,
top_k = 10 ,
filters = filters
)
Best practices:
Enable payload indexing for frequently filtered fields
Use quantization (scalar or binary) for large datasets
Monitor disk usage and configure storage thresholds
Use gRPC instead of HTTP for lower latency
Production configuration: chroma :
host : "${CHROMA_HOST:-localhost}"
port : ${CHROMA_PORT:-8000}
tenant : "default"
database : "production_db"
Best practices:
Run Chroma server in Docker for production
Use persistent storage volumes
Implement connection pooling for concurrent requests
Monitor collection size and query latency
Logging and monitoring
Production logging configuration
Set appropriate log levels by environment:
logging :
name : "vectordb_production"
level : "${LOG_LEVEL:-WARNING}"
format : "json" # Structured logging for analysis
handlers :
- type : "file"
path : "/var/log/vectordb/pipeline.log"
max_bytes : 10485760 # 10MB
backup_count : 5
- type : "console"
level : "ERROR"
Custom logging setup:
import logging
from vectordb.utils.logging import LoggerFactory
# Configure structured logging
logger_factory = LoggerFactory(
name = "vectordb_production" ,
log_level = logging. WARNING ,
log_format = "json"
)
logger = logger_factory.get_logger()
logger.warning( "High latency detected" , extra = {
"query" : query_text,
"latency_ms" : elapsed * 1000 ,
"database" : "pinecone" ,
"top_k" : 10
})
Key metrics to monitor
Query latency
p50, p95, p99 latency by query type
Breakdown: embedding, retrieval, reranking, generation
Alert on p95 > SLO threshold
Retrieval quality
Online Recall@k and MRR
User feedback signals (clicks, dwell time)
Fallback rate (queries with no results)
Cost metrics
LLM API token usage per query
Embedding API costs
Database operations cost
Cost per 1000 queries
System health
Database connection errors
API rate limit hits
Retry and timeout rates
Error rates by type
Example monitoring implementation
import time
import logging
from prometheus_client import Counter, Histogram
# Define metrics
query_latency = Histogram(
"vectordb_query_latency_seconds" ,
"Query latency in seconds" ,
[ "database" , "strategy" ]
)
query_counter = Counter(
"vectordb_queries_total" ,
"Total queries processed" ,
[ "database" , "status" ]
)
error_counter = Counter(
"vectordb_errors_total" ,
"Total errors" ,
[ "database" , "error_type" ]
)
# Instrument pipeline
class MonitoredPipeline :
def __init__ ( self , base_pipeline , database_name ):
self .pipeline = base_pipeline
self .database = database_name
self .logger = logging.getLogger( __name__ )
def search ( self , query : str , top_k : int = 10 ):
start = time.time()
try :
result = self .pipeline.search(query, top_k = top_k)
# Record success metrics
elapsed = time.time() - start
query_latency.labels(
database = self .database,
strategy = "semantic"
).observe(elapsed)
query_counter.labels(
database = self .database,
status = "success"
).inc()
self .logger.info(
"Query completed" ,
extra = {
"latency_ms" : elapsed * 1000 ,
"num_results" : len (result[ "documents" ])
}
)
return result
except Exception as e:
# Record error metrics
error_counter.labels(
database = self .database,
error_type = type (e). __name__
).inc()
query_counter.labels(
database = self .database,
status = "error"
).inc()
self .logger.error(
f "Query failed: { str (e) } " ,
extra = {
"query" : query,
"error_type" : type (e). __name__
},
exc_info = True
)
raise
# Usage
base_pipeline = PineconeSemanticSearchPipeline( "config.yaml" )
monitored_pipeline = MonitoredPipeline(base_pipeline, "pinecone" )
result = monitored_pipeline.search( "What is quantum computing?" , top_k = 10 )
Error handling and resilience
Retry logic with exponential backoff
import time
from typing import TypeVar, Callable
T = TypeVar( 'T' )
def retry_with_backoff (
func : Callable[[], T],
max_retries : int = 3 ,
base_delay : float = 1.0 ,
max_delay : float = 60.0 ,
exponential_base : float = 2.0
) -> T:
"""Retry function with exponential backoff."""
for attempt in range (max_retries):
try :
return func()
except Exception as e:
if attempt == max_retries - 1 :
raise
delay = min (
base_delay * (exponential_base ** attempt),
max_delay
)
logging.warning(
f "Attempt { attempt + 1 } failed: { str (e) } . "
f "Retrying in { delay :.1f} s..."
)
time.sleep(delay)
raise RuntimeError ( "Max retries exceeded" )
# Usage
result = retry_with_backoff(
lambda : pipeline.search(query, top_k = 10 ),
max_retries = 3
)
Circuit breaker pattern
from datetime import datetime, timedelta
class CircuitBreaker :
def __init__ ( self , failure_threshold = 5 , timeout = 60 ):
self .failure_threshold = failure_threshold
self .timeout = timeout
self .failures = 0
self .last_failure_time = None
self .state = "closed" # closed, open, half-open
def call ( self , func ):
if self .state == "open" :
if datetime.now() - self .last_failure_time > timedelta( seconds = self .timeout):
self .state = "half-open"
else :
raise Exception ( "Circuit breaker is OPEN" )
try :
result = func()
if self .state == "half-open" :
self .state = "closed"
self .failures = 0
return result
except Exception as e:
self .failures += 1
self .last_failure_time = datetime.now()
if self .failures >= self .failure_threshold:
self .state = "open"
raise
# Usage
breaker = CircuitBreaker( failure_threshold = 5 , timeout = 60 )
try :
result = breaker.call(
lambda : pipeline.search(query, top_k = 10 )
)
except Exception as e:
# Fall back to cached results or degraded service
result = get_cached_results(query)
Cost-optimized configuration
Reduce costs while maintaining quality:
search :
candidate_pool_size : 15 # Reduced from 50
top_k : 10
cost_optimization :
context_budget : 2000 # Max tokens for LLM
model_tiering :
routing : "llama-3.1-8b-instant" # Cheaper model
generation : "llama-3.3-70b-versatile"
compression :
enabled : true
strategy : "extractive"
num_sentences : 5
rag :
enabled : true
model : "${COST_OPTIMIZATION_MODEL_TIERING_GENERATION}"
api_key : "${GROQ_API_KEY}"
Caching strategy
from functools import lru_cache
import hashlib
class CachedPipeline :
def __init__ ( self , pipeline , cache_size = 1000 ):
self .pipeline = pipeline
self ._search_cached = lru_cache( maxsize = cache_size)( self ._search)
def _search ( self , query_hash : str , top_k : int ):
return self .pipeline.search(query_hash, top_k = top_k)
def search ( self , query : str , top_k : int = 10 ):
# Hash query for cache key
query_hash = hashlib.md5(
f " { query } : { top_k } " .encode()
).hexdigest()
return self ._search_cached(query_hash, top_k)
# Usage
cached_pipeline = CachedPipeline(pipeline, cache_size = 1000 )
result = cached_pipeline.search( "What is photosynthesis?" , top_k = 10 )
Deployment patterns
Containerized deployment
Sample Dockerfile:
FROM python:3.11-slim
WORKDIR /app
# Install uv
RUN pip install uv
# Copy dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen
# Copy application
COPY src/ ./src/
COPY configs/ ./configs/
# Set environment
ENV PYTHONPATH=/app
ENV ENVIRONMENT=production
# Run application
CMD [ "uv" , "run" , "python" , "src/main.py" ]
Kubernetes deployment
apiVersion : apps/v1
kind : Deployment
metadata :
name : vectordb-pipeline
spec :
replicas : 3
selector :
matchLabels :
app : vectordb-pipeline
template :
metadata :
labels :
app : vectordb-pipeline
spec :
containers :
- name : pipeline
image : vectordb-pipeline:latest
env :
- name : ENVIRONMENT
value : "production"
- name : PINECONE_API_KEY
valueFrom :
secretKeyRef :
name : vectordb-secrets
key : pinecone-api-key
- name : GROQ_API_KEY
valueFrom :
secretKeyRef :
name : vectordb-secrets
key : groq-api-key
resources :
requests :
memory : "2Gi"
cpu : "1"
limits :
memory : "4Gi"
cpu : "2"
Next steps
Benchmarking Validate production performance with benchmarks
Configuration Fine-tune production settings
Environment variables Complete reference for production credentials
Building RAG pipelines Learn core RAG pipeline concepts