Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/langchain-ai/langgraph/llms.txt

Use this file to discover all available pages before exploring further.

Deploying LangGraph applications requires careful consideration of persistence, scalability, monitoring, and infrastructure.

Deployment Options

LangSmith Deployment

The easiest way to deploy LangGraph applications:
# Install CLI
pip install langgraph-cli

# Initialize project
langgraph init

# Deploy to LangSmith
langgraph deploy
Benefits:
  • Managed infrastructure
  • Built-in observability
  • Automatic scaling
  • Production checkpointers
  • LangGraph Studio integration
LangSmith Deployment handles persistence, scaling, and monitoring automatically.

Self-Hosted Deployment

For self-hosted deployments, you’ll need to configure:
  1. Web server (FastAPI, Flask)
  2. Persistent checkpointer (PostgreSQL, SQLite)
  3. Message queue (for async processing)
  4. Load balancer
  5. Monitoring and logging

Production Setup

1
Configure Checkpointer
2
Use a persistent checkpointer:
3
from langgraph.checkpoint.postgres import PostgresSaver
import os

# Production database
DB_URI = os.getenv("DATABASE_URL")

with PostgresSaver.from_conn_string(DB_URI) as checkpointer:
    checkpointer.setup()
    app = graph.compile(checkpointer=checkpointer)
4
Create API Server
5
Wrap your graph in a web API:
6
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn

api = FastAPI()

class InvokeRequest(BaseModel):
    input: dict
    thread_id: str

class InvokeResponse(BaseModel):
    output: dict
    thread_id: str

@api.post("/invoke", response_model=InvokeResponse)
async def invoke_graph(request: InvokeRequest):
    """Invoke the graph."""
    try:
        config = {"configurable": {"thread_id": request.thread_id}}
        result = await app.ainvoke(request.input, config)
        
        return InvokeResponse(
            output=result,
            thread_id=request.thread_id,
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@api.post("/stream")
async def stream_graph(request: InvokeRequest):
    """Stream graph execution."""
    config = {"configurable": {"thread_id": request.thread_id}}
    
    async def event_generator():
        async for chunk in app.astream(request.input, config):
            yield f"data: {json.dumps(chunk)}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )

if __name__ == "__main__":
    uvicorn.run(api, host="0.0.0.0", port=8000)
7
Add Health Checks
8
@api.get("/health")
async def health_check():
    """Health check endpoint."""
    try:
        # Check database connection
        state = app.get_state({"configurable": {"thread_id": "health-check"}})
        return {"status": "healthy"}
    except Exception as e:
        raise HTTPException(status_code=503, detail=f"Unhealthy: {e}")
9
Configure Environment
10
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    database_url: str
    openai_api_key: str
    anthropic_api_key: str
    log_level: str = "INFO"
    max_workers: int = 4
    
    class Config:
        env_file = ".env"

settings = Settings()

Containerization

Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy application
COPY . .

# Run server
CMD ["uvicorn", "main:api", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose

version: '3.8'

services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:password@db:5432/langgraph
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - db
  
  db:
    image: postgres:15
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=langgraph
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:

Kubernetes Deployment

Deployment Manifest

apiVersion: apps/v1
kind: Deployment
metadata:
  name: langgraph-app
spec:
  replicas: 3
  selector:
    matchLabels:
      app: langgraph
  template:
    metadata:
      labels:
        app: langgraph
    spec:
      containers:
      - name: app
        image: your-registry/langgraph-app:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: database-url
        - name: OPENAI_API_KEY
          valueFrom:
            secretKeyRef:
              name: app-secrets
              key: openai-api-key
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10

Service Manifest

apiVersion: v1
kind: Service
metadata:
  name: langgraph-service
spec:
  selector:
    app: langgraph
  ports:
  - protocol: TCP
    port: 80
    targetPort: 8000
  type: LoadBalancer

Scaling Considerations

Horizontal Scaling

LangGraph applications can scale horizontally:
# Use connection pooling
from psycopg_pool import ConnectionPool

pool = ConnectionPool(
    conninfo=DB_URI,
    min_size=2,
    max_size=10,
)

checkpointer = PostgresSaver(pool)

Async Processing

Handle long-running workflows asynchronously:
from celery import Celery

celery_app = Celery('langgraph', broker='redis://localhost:6379')

@celery_app.task
def process_graph(input_data: dict, thread_id: str):
    """Process graph in background."""
    config = {"configurable": {"thread_id": thread_id}}
    result = app.invoke(input_data, config)
    return result

# API endpoint
@api.post("/invoke-async")
async def invoke_async(request: InvokeRequest):
    task = process_graph.delay(request.input, request.thread_id)
    return {"task_id": task.id}

Caching

Implement caching for frequently accessed data:
from langgraph.cache.memory import InMemoryCache

cache = InMemoryCache()

app = graph.compile(
    checkpointer=checkpointer,
    cache=cache,
)

Monitoring

LangSmith Integration

import os

# Enable LangSmith tracing
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-app"

# Traces automatically sent to LangSmith
result = app.invoke(input_data, config)

Custom Metrics

from prometheus_client import Counter, Histogram
import time

# Define metrics
invocation_counter = Counter(
    'langgraph_invocations_total',
    'Total graph invocations',
    ['status']
)

invocation_duration = Histogram(
    'langgraph_invocation_duration_seconds',
    'Graph invocation duration'
)

# Instrument code
def invoke_with_metrics(input_data, config):
    start = time.time()
    
    try:
        result = app.invoke(input_data, config)
        invocation_counter.labels(status='success').inc()
        return result
    except Exception as e:
        invocation_counter.labels(status='error').inc()
        raise
    finally:
        duration = time.time() - start
        invocation_duration.observe(duration)

Logging

import logging
import json

# Structured logging
logger = logging.getLogger(__name__)

class StructuredLogger:
    @staticmethod
    def log_invocation(thread_id: str, input_data: dict, result: dict):
        logger.info(json.dumps({
            "event": "graph_invocation",
            "thread_id": thread_id,
            "input": input_data,
            "output": result,
            "timestamp": datetime.now().isoformat(),
        }))

# Use in API
@api.post("/invoke")
async def invoke_graph(request: InvokeRequest):
    result = app.invoke(request.input, config)
    StructuredLogger.log_invocation(
        request.thread_id,
        request.input,
        result,
    )
    return result

Security

Authentication

from fastapi import Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    """Verify JWT token."""
    token = credentials.credentials
    
    # Verify token
    if not is_valid_token(token):
        raise HTTPException(status_code=401, detail="Invalid token")
    
    return get_user_from_token(token)

@api.post("/invoke")
async def invoke_graph(
    request: InvokeRequest,
    user = Depends(verify_token),
):
    # Use user-specific thread_id
    config = {"configurable": {"thread_id": f"{user.id}-{request.thread_id}"}}
    return app.invoke(request.input, config)

Rate Limiting

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
api.state.limiter = limiter

@api.post("/invoke")
@limiter.limit("10/minute")
async def invoke_graph(request: Request, invoke_request: InvokeRequest):
    # Rate limited to 10 requests per minute
    return app.invoke(invoke_request.input, config)

Best Practices

  • Use persistent checkpointers: PostgreSQL or managed services for production
  • Implement health checks: Monitor application and database health
  • Enable tracing: Use LangSmith for observability
  • Handle errors gracefully: Return meaningful error messages
  • Validate input: Check user input before processing
  • Set resource limits: Prevent resource exhaustion
  • Use environment variables: Never hardcode secrets
  • Implement retries: Handle transient failures
  • Monitor performance: Track latency and throughput
  • Plan for scaling: Design for horizontal scaling from the start

Next Steps

Build docs developers (and LLMs) love