Documentation Index Fetch the complete documentation index at: https://mintlify.com/langchain-ai/langchain/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Rate limiting controls how frequently your application makes API calls to LLM providers, preventing quota exhaustion, managing costs, and avoiding service throttling. LangChain provides both model-level and agent-level rate limiting.
Model-Level Rate Limiting
Apply rate limits directly to chat models using the built-in InMemoryRateLimiter:
from langchain_core.rate_limiters import InMemoryRateLimiter
from langchain_anthropic import ChatAnthropic
# Limit to 1 request per 10 seconds
rate_limiter = InMemoryRateLimiter(
requests_per_second = 0.1 , # 1 request every 10 seconds
check_every_n_seconds = 0.1 , # Check every 100ms
max_bucket_size = 10 , # Allow bursts up to 10 requests
)
model = ChatAnthropic(
model = "claude-sonnet-4-5-20250929" ,
rate_limiter = rate_limiter,
)
# Calls are automatically rate limited
for i in range ( 5 ):
response = model.invoke( "Hello" )
print ( f "Request { i + 1 } completed" )
The model blocks until tokens are available. If you need 5 requests and limit is 0.1 requests/second, it takes ~50 seconds to complete.
InMemoryRateLimiter
Token bucket algorithm for time-based rate limiting.
Parameters
Number of requests allowed per second. Fractional values enable slower rates:
1.0: 1 request per second
0.1: 1 request per 10 seconds
10.0: 10 requests per second
How often to check for available tokens (in seconds). Lower values provide more granular timing but slightly more overhead.
Maximum tokens that can accumulate. Controls burst behavior:
1.0: No burst, strictly sequential
10.0: Allow bursts of up to 10 requests
Useful when rate limit is 10/second but you want to allow 20 requests immediately if tokens have accumulated.
Token Bucket Algorithm
The rate limiter uses a token bucket:
Bucket fills with tokens at requests_per_second rate
Each request consumes 1 token
If bucket is empty, request blocks until token available
Bucket capacity capped at max_bucket_size
# Example: 2 requests/second, max 5 tokens
rate_limiter = InMemoryRateLimiter(
requests_per_second = 2.0 ,
max_bucket_size = 5.0 ,
)
# Scenario:
# - Start: 0 tokens
# - Wait 2.5 seconds: 5 tokens (capped at max_bucket_size)
# - Make 5 rapid requests: All succeed immediately (burst)
# - 6th request: Blocks for 0.5 seconds until next token
Blocking vs Non-Blocking
Control whether to wait for tokens or fail immediately:
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
# Blocking (default): Waits for token
if rate_limiter.acquire( blocking = True ):
make_request()
# Non-blocking: Returns False immediately if no token
if rate_limiter.acquire( blocking = False ):
make_request()
else :
print ( "Rate limit reached, skipping request" )
# Async version
if await rate_limiter.aacquire( blocking = True ):
await make_async_request()
Custom Rate Limiter
Implement BaseRateLimiter for custom strategies (distributed rate limiting, API-specific quotas, etc.):
from langchain_core.rate_limiters import BaseRateLimiter
import redis
import time
class RedisRateLimiter ( BaseRateLimiter ):
"""Distributed rate limiter using Redis."""
def __init__ ( self , key : str , max_requests : int , window_seconds : int ):
self .redis = redis.Redis( host = 'localhost' , port = 6379 )
self .key = key
self .max_requests = max_requests
self .window_seconds = window_seconds
def acquire ( self , * , blocking : bool = True ) -> bool :
"""Acquire token using Redis sliding window."""
current_time = time.time()
window_start = current_time - self .window_seconds
# Remove old entries
self .redis.zremrangebyscore( self .key, 0 , window_start)
# Count requests in current window
current_count = self .redis.zcard( self .key)
if current_count < self .max_requests:
# Add current request
self .redis.zadd( self .key, { str (current_time): current_time})
self .redis.expire( self .key, self .window_seconds)
return True
if not blocking:
return False
# Wait for window to slide
oldest = self .redis.zrange( self .key, 0 , 0 , withscores = True )
if oldest:
wait_time = oldest[ 0 ][ 1 ] + self .window_seconds - current_time
if wait_time > 0 :
time.sleep(wait_time)
return self .acquire( blocking = True )
async def aacquire ( self , * , blocking : bool = True ) -> bool :
"""Async version using async Redis client."""
# Implementation with aioredis
pass
# Usage with shared state across processes
rate_limiter = RedisRateLimiter(
key = "api_calls:openai" ,
max_requests = 100 ,
window_seconds = 60 ,
)
model = ChatOpenAI( model = "gpt-4" , rate_limiter = rate_limiter)
BaseRateLimiter Interface
Synchronous token acquisition. Parameters:
blocking (bool): Wait for token if True, return immediately if False
Returns: True if token acquired, False if rate limited (non-blocking only)
Async token acquisition. Parameters:
blocking (bool): Wait for token if True, return immediately if False
Returns: True if token acquired, False if rate limited (non-blocking only)
Agent-Level Rate Limiting
Combine rate limiting with middleware for finer control:
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call
from langchain.agents.middleware.types import ModelRequest, ModelResponse
from langchain_core.rate_limiters import InMemoryRateLimiter
rate_limiter = InMemoryRateLimiter( requests_per_second = 2.0 )
@wrap_model_call
def rate_limit_model ( request : ModelRequest, handler ) -> ModelResponse:
"""Rate limit all model calls in agent."""
# Wait for rate limit
rate_limiter.acquire( blocking = True )
# Proceed with request
return handler(request)
agent = create_agent(
model = "openai:gpt-4" ,
tools = [search_tool],
middleware = [rate_limit_model],
)
Limit specific tool execution rates:
from langchain.agents.middleware import wrap_tool_call
from langchain.agents.middleware.types import ToolCallRequest
from langchain_core.rate_limiters import InMemoryRateLimiter
# Separate rate limiters per tool
api_limiter = InMemoryRateLimiter( requests_per_second = 5.0 )
db_limiter = InMemoryRateLimiter( requests_per_second = 10.0 )
@wrap_tool_call
def rate_limit_tools ( request : ToolCallRequest, handler ):
"""Apply different rate limits per tool."""
tool_name = request.tool.name if request.tool else request.tool_call[ "name" ]
if tool_name == "api_search" :
api_limiter.acquire( blocking = True )
elif tool_name == "database_query" :
db_limiter.acquire( blocking = True )
return handler(request)
agent = create_agent(
model = "openai:gpt-4" ,
tools = [api_search, database_query],
middleware = [rate_limit_tools],
)
Dynamic Rate Limiting
Adjust rate limits based on context:
from langchain.agents.middleware import wrap_model_call
class DynamicRateLimiter :
"""Rate limiter that adjusts based on user tier."""
def __init__ ( self ):
self .limiters = {
"free" : InMemoryRateLimiter( requests_per_second = 0.5 ),
"pro" : InMemoryRateLimiter( requests_per_second = 5.0 ),
"enterprise" : InMemoryRateLimiter( requests_per_second = 50.0 ),
}
def get_limiter ( self , user_tier : str ) -> InMemoryRateLimiter:
return self .limiters.get(user_tier, self .limiters[ "free" ])
dynamic_limiter = DynamicRateLimiter()
@wrap_model_call
def tiered_rate_limit ( request : ModelRequest, handler ) -> ModelResponse:
"""Apply rate limit based on user tier."""
user_tier = request.runtime.config.get( "configurable" , {}).get( "tier" , "free" )
limiter = dynamic_limiter.get_limiter(user_tier)
limiter.acquire( blocking = True )
return handler(request)
# Usage
agent = create_agent(
model = "openai:gpt-4" ,
middleware = [tiered_rate_limit],
)
# Set user tier in config
response = agent.invoke(
{ "messages" : [HumanMessage( "Hello" )]},
config = { "configurable" : { "tier" : "pro" }},
)
Combining with Retry Logic
Use rate limiting with retry middleware for resilient API calls:
from langchain.agents.middleware import ModelRetryMiddleware
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
retry_middleware = ModelRetryMiddleware(
max_retries = 3 ,
backoff_factor = 2.0 ,
)
model = ChatOpenAI(
model = "gpt-4" ,
rate_limiter = rate_limiter, # Rate limit at model level
)
agent = create_agent(
model = model,
middleware = [retry_middleware], # Retry on errors
)
Monitoring Rate Limit Usage
Track rate limit consumption:
from langchain.agents.middleware import AgentMiddleware
from langchain_core.rate_limiters import InMemoryRateLimiter
import time
class RateLimitMonitor ( AgentMiddleware ):
"""Monitor rate limit token consumption."""
def __init__ ( self , rate_limiter : InMemoryRateLimiter):
super (). __init__ ()
self .rate_limiter = rate_limiter
self .wait_time_total = 0
def wrap_model_call ( self , request , handler ):
start = time.time()
# Acquire with monitoring
self .rate_limiter.acquire( blocking = True )
wait_time = time.time() - start
self .wait_time_total += wait_time
if wait_time > 0 :
print ( f "Waited { wait_time :.2f} s for rate limit" )
return handler(request)
def after_agent ( self , state , runtime ):
print ( f "Total rate limit wait time: { self .wait_time_total :.2f} s" )
print ( f "Available tokens: { self .rate_limiter.available_tokens :.2f} " )
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
monitor = RateLimitMonitor(rate_limiter)
agent = create_agent(
model = "openai:gpt-4" ,
middleware = [monitor],
)
Best Practices
Start with conservative limits and increase based on monitoring: # Start conservative
rate_limiter = InMemoryRateLimiter(
requests_per_second = 1.0 , # 1 req/sec initially
max_bucket_size = 2.0 , # Limited burst
)
# Monitor and adjust
# If no rate limit errors and fast enough, increase to 2.0 req/sec
# If hitting provider limits, decrease to 0.5 req/sec
Account for Burst Traffic
Set max_bucket_size to handle expected burst patterns: # Handle morning traffic spike
rate_limiter = InMemoryRateLimiter(
requests_per_second = 5.0 , # Average rate
max_bucket_size = 50.0 , # Allow 50-request burst
)
Use Different Limits per Environment
Production and development should have different limits: import os
if os.getenv( "ENV" ) == "production" :
rate_limiter = InMemoryRateLimiter( requests_per_second = 10.0 )
else :
rate_limiter = InMemoryRateLimiter( requests_per_second = 1.0 )
Track rate limit hits and set up alerts: @wrap_model_call
def monitored_rate_limit ( request : ModelRequest, handler ) -> ModelResponse:
start = time.time()
rate_limiter.acquire( blocking = True )
wait_time = time.time() - start
if wait_time > 5.0 : # Alert if waiting >5 seconds
alert_ops_team( f "High rate limit wait: { wait_time :.2f} s" )
return handler(request)
Limitations
InMemoryRateLimiter is in-memory only:
Does NOT work across multiple processes/servers
Resets on application restart
Thread-safe but not process-safe
For distributed systems, implement a custom BaseRateLimiter using Redis, DynamoDB, or similar.
Next Steps
Middleware System Build custom rate limiting middleware
Performance Optimize performance with caching and batching
Custom Tools Rate limit specific tools