Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/avnlp/vectordb/llms.txt

Use this file to discover all available pages before exploring further.

This guide covers best practices, configuration patterns, and deployment strategies for running VectorDB pipelines in production.

Deployment checklist

Before deploying to production, ensure you have:
  • Benchmarked retrieval quality on representative queries
  • Tuned top_k, candidate_pool_size, and reranking settings
  • Set up monitoring and observability
  • Configured proper logging levels
  • Secured API keys using environment variables
  • Tested error handling and fallback behavior
  • Established cost budgets and alerts
  • Implemented rate limiting for LLM calls
  • Validated latency meets SLO requirements

Environment configuration

Production environment variables

Use environment variables for all secrets and environment-specific settings:
.env.production
# Vector Database Credentials
PINECONE_API_KEY=pc-prod-xxxx
WEAVIATE_URL=https://prod-cluster.weaviate.network
WEAVIATE_API_KEY=weaviate-prod-key
MILVUS_URI=https://prod-milvus.example.com:19530
MILVUS_TOKEN=milvus-prod-token
QDRANT_URL=https://qdrant-prod.example.com
QDRANT_API_KEY=qdrant-prod-key

# LLM API Keys
GROQ_API_KEY=gsk_prod_xxxx
OPENAI_API_KEY=sk-prod-xxxx
COHERE_API_KEY=cohere-prod-xxxx

# Deployment Settings
LOG_LEVEL=WARNING
ENABLE_TELEMETRY=true
MAX_RETRIES=3
TIMEOUT_SECONDS=30
See the environment variables reference for the complete list.

Configuration file structure

Organize configurations by environment:
configs/
├── base.yaml              # Shared settings
├── development.yaml       # Dev overrides
├── staging.yaml          # Staging overrides
└── production.yaml       # Production settings
Base configuration (configs/base.yaml):
embeddings:
  model: "Qwen/Qwen3-Embedding-0.6B"
  batch_size: 32
  device: "cpu"

logging:
  name: "vectordb_pipeline"
  level: "INFO"
Production overrides (configs/production.yaml):
embeddings:
  batch_size: 64  # Higher throughput in production
  device: "cuda"  # GPU acceleration

search:
  top_k: 10
  candidate_pool_size: 15  # Cost-optimized

rag:
  enabled: true
  model: "llama-3.3-70b-versatile"
  api_key: "${GROQ_API_KEY}"
  temperature: 0.7
  max_tokens: 2048

logging:
  level: "WARNING"  # Reduce log volume

retry:
  max_attempts: 3
  backoff_factor: 2
Load environment-specific config:
import os
from vectordb.utils.config_loader import ConfigLoader

env = os.getenv("ENVIRONMENT", "development")
config_path = f"configs/{env}.yaml"

pipeline = PineconeSemanticSearchPipeline(config_path)

Database-specific deployment considerations

Namespace strategy:
pinecone:
  api_key: "${PINECONE_API_KEY}"
  index_name: "production-index"
  namespace: "v1"  # Version namespaces for zero-downtime updates
  dimension: 1024
  metric: "cosine"
  recreate: false  # Never recreate in production
Multi-tenancy:Use namespaces for tenant isolation (scales to 100,000+ tenants):
for tenant_id in tenant_ids:
    result = pipeline.search(
        query=user_query,
        top_k=10,
        namespace=f"tenant-{tenant_id}"
    )
Best practices:
  • Monitor pod utilization and scale replicas based on QPS
  • Use serverless indexes for variable workloads
  • Implement retry logic for rate limit errors (429)
Connection configuration:
weaviate:
  cluster_url: "${WEAVIATE_URL}"
  api_key: "${WEAVIATE_API_KEY}"
  timeout: 30
  connection_pool_size: 10
Multi-tenancy:Weaviate supports native multi-tenancy with per-tenant shards:
# Initialize with tenant support
pipeline = WeaviateSemanticSearchPipeline(
    config_path,
    tenant="customer-123"
)
Best practices:
  • Use batch imports for initial indexing (100+ docs/batch)
  • Enable quantization (PQ or BQ) to reduce memory 4x
  • Monitor shard health and replication status
Production configuration:
milvus:
  uri: "${MILVUS_URI}"
  token: "${MILVUS_TOKEN}"
  collection_name: "production_collection"
  dimension: 1024
  recreate: false
  batch_size: 100
Partition-based multi-tenancy:
# Scales to millions of tenants using partition keys
config = {
    "milvus": {
        "partition_key": "tenant_id",
        "num_partitions": 1000
    }
}
Best practices:
  • Use scalar quantization (SQ8) for 4x storage reduction
  • Enable partition pruning with metadata filters
  • Monitor memory usage per collection
  • Set appropriate index_file_size for write throughput
Production setup:
qdrant:
  url: "${QDRANT_URL}"
  api_key: "${QDRANT_API_KEY}"
  collection_name: "production_docs"
  timeout: 30
  prefer_grpc: true  # Better performance
Payload-based multi-tenancy:
# Use payload filters for tenant isolation
filters = {
    "must": [
        {"key": "tenant_id", "match": {"value": tenant_id}}
    ]
}

result = pipeline.search(
    query=user_query,
    top_k=10,
    filters=filters
)
Best practices:
  • Enable payload indexing for frequently filtered fields
  • Use quantization (scalar or binary) for large datasets
  • Monitor disk usage and configure storage thresholds
  • Use gRPC instead of HTTP for lower latency
Production configuration:
chroma:
  host: "${CHROMA_HOST:-localhost}"
  port: ${CHROMA_PORT:-8000}
  tenant: "default"
  database: "production_db"
Best practices:
  • Run Chroma server in Docker for production
  • Use persistent storage volumes
  • Implement connection pooling for concurrent requests
  • Monitor collection size and query latency

Logging and monitoring

Production logging configuration

Set appropriate log levels by environment:
logging:
  name: "vectordb_production"
  level: "${LOG_LEVEL:-WARNING}"
  format: "json"  # Structured logging for analysis
  handlers:
    - type: "file"
      path: "/var/log/vectordb/pipeline.log"
      max_bytes: 10485760  # 10MB
      backup_count: 5
    - type: "console"
      level: "ERROR"
Custom logging setup:
import logging
from vectordb.utils.logging import LoggerFactory

# Configure structured logging
logger_factory = LoggerFactory(
    name="vectordb_production",
    log_level=logging.WARNING,
    log_format="json"
)
logger = logger_factory.get_logger()

logger.warning("High latency detected", extra={
    "query": query_text,
    "latency_ms": elapsed * 1000,
    "database": "pinecone",
    "top_k": 10
})

Key metrics to monitor

Query latency

  • p50, p95, p99 latency by query type
  • Breakdown: embedding, retrieval, reranking, generation
  • Alert on p95 > SLO threshold

Retrieval quality

  • Online Recall@k and MRR
  • User feedback signals (clicks, dwell time)
  • Fallback rate (queries with no results)

Cost metrics

  • LLM API token usage per query
  • Embedding API costs
  • Database operations cost
  • Cost per 1000 queries

System health

  • Database connection errors
  • API rate limit hits
  • Retry and timeout rates
  • Error rates by type

Example monitoring implementation

import time
import logging
from prometheus_client import Counter, Histogram

# Define metrics
query_latency = Histogram(
    "vectordb_query_latency_seconds",
    "Query latency in seconds",
    ["database", "strategy"]
)

query_counter = Counter(
    "vectordb_queries_total",
    "Total queries processed",
    ["database", "status"]
)

error_counter = Counter(
    "vectordb_errors_total",
    "Total errors",
    ["database", "error_type"]
)

# Instrument pipeline
class MonitoredPipeline:
    def __init__(self, base_pipeline, database_name):
        self.pipeline = base_pipeline
        self.database = database_name
        self.logger = logging.getLogger(__name__)
    
    def search(self, query: str, top_k: int = 10):
        start = time.time()
        
        try:
            result = self.pipeline.search(query, top_k=top_k)
            
            # Record success metrics
            elapsed = time.time() - start
            query_latency.labels(
                database=self.database,
                strategy="semantic"
            ).observe(elapsed)
            
            query_counter.labels(
                database=self.database,
                status="success"
            ).inc()
            
            self.logger.info(
                "Query completed",
                extra={
                    "latency_ms": elapsed * 1000,
                    "num_results": len(result["documents"])
                }
            )
            
            return result
            
        except Exception as e:
            # Record error metrics
            error_counter.labels(
                database=self.database,
                error_type=type(e).__name__
            ).inc()
            
            query_counter.labels(
                database=self.database,
                status="error"
            ).inc()
            
            self.logger.error(
                f"Query failed: {str(e)}",
                extra={
                    "query": query,
                    "error_type": type(e).__name__
                },
                exc_info=True
            )
            
            raise

# Usage
base_pipeline = PineconeSemanticSearchPipeline("config.yaml")
monitored_pipeline = MonitoredPipeline(base_pipeline, "pinecone")

result = monitored_pipeline.search("What is quantum computing?", top_k=10)

Error handling and resilience

Retry logic with exponential backoff

import time
from typing import TypeVar, Callable

T = TypeVar('T')

def retry_with_backoff(
    func: Callable[[], T],
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential_base: float = 2.0
) -> T:
    """Retry function with exponential backoff."""
    
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = min(
                base_delay * (exponential_base ** attempt),
                max_delay
            )
            
            logging.warning(
                f"Attempt {attempt + 1} failed: {str(e)}. "
                f"Retrying in {delay:.1f}s..."
            )
            
            time.sleep(delay)
    
    raise RuntimeError("Max retries exceeded")

# Usage
result = retry_with_backoff(
    lambda: pipeline.search(query, top_k=10),
    max_retries=3
)

Circuit breaker pattern

from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half-open
    
    def call(self, func):
        if self.state == "open":
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = "half-open"
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func()
            if self.state == "half-open":
                self.state = "closed"
                self.failures = 0
            return result
        except Exception as e:
            self.failures += 1
            self.last_failure_time = datetime.now()
            
            if self.failures >= self.failure_threshold:
                self.state = "open"
            
            raise

# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout=60)

try:
    result = breaker.call(
        lambda: pipeline.search(query, top_k=10)
    )
except Exception as e:
    # Fall back to cached results or degraded service
    result = get_cached_results(query)

Performance optimization

Cost-optimized configuration

Reduce costs while maintaining quality:
search:
  candidate_pool_size: 15  # Reduced from 50
  top_k: 10

cost_optimization:
  context_budget: 2000  # Max tokens for LLM
  model_tiering:
    routing: "llama-3.1-8b-instant"  # Cheaper model
    generation: "llama-3.3-70b-versatile"
  compression:
    enabled: true
    strategy: "extractive"
    num_sentences: 5

rag:
  enabled: true
  model: "${COST_OPTIMIZATION_MODEL_TIERING_GENERATION}"
  api_key: "${GROQ_API_KEY}"

Caching strategy

from functools import lru_cache
import hashlib

class CachedPipeline:
    def __init__(self, pipeline, cache_size=1000):
        self.pipeline = pipeline
        self._search_cached = lru_cache(maxsize=cache_size)(self._search)
    
    def _search(self, query_hash: str, top_k: int):
        return self.pipeline.search(query_hash, top_k=top_k)
    
    def search(self, query: str, top_k: int = 10):
        # Hash query for cache key
        query_hash = hashlib.md5(
            f"{query}:{top_k}".encode()
        ).hexdigest()
        
        return self._search_cached(query_hash, top_k)

# Usage
cached_pipeline = CachedPipeline(pipeline, cache_size=1000)
result = cached_pipeline.search("What is photosynthesis?", top_k=10)

Deployment patterns

Containerized deployment

Sample Dockerfile:
FROM python:3.11-slim

WORKDIR /app

# Install uv
RUN pip install uv

# Copy dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen

# Copy application
COPY src/ ./src/
COPY configs/ ./configs/

# Set environment
ENV PYTHONPATH=/app
ENV ENVIRONMENT=production

# Run application
CMD ["uv", "run", "python", "src/main.py"]

Kubernetes deployment

deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: vectordb-pipeline
spec:
  replicas: 3
  selector:
    matchLabels:
      app: vectordb-pipeline
  template:
    metadata:
      labels:
        app: vectordb-pipeline
    spec:
      containers:
      - name: pipeline
        image: vectordb-pipeline:latest
        env:
        - name: ENVIRONMENT
          value: "production"
        - name: PINECONE_API_KEY
          valueFrom:
            secretKeyRef:
              name: vectordb-secrets
              key: pinecone-api-key
        - name: GROQ_API_KEY
          valueFrom:
            secretKeyRef:
              name: vectordb-secrets
              key: groq-api-key
        resources:
          requests:
            memory: "2Gi"
            cpu: "1"
          limits:
            memory: "4Gi"
            cpu: "2"

Next steps

Benchmarking

Validate production performance with benchmarks

Configuration

Fine-tune production settings

Environment variables

Complete reference for production credentials

Building RAG pipelines

Learn core RAG pipeline concepts

Build docs developers (and LLMs) love