Understanding Rate Limits
The X API enforces rate limits to ensure fair usage and platform stability. Different endpoints have different limits based on authentication method and access tier.
Rate Limit Structure
Per-User Limits Limits are per authenticated user for OAuth endpoints
Per-App Limits App-level limits for bearer token requests
Time Windows Typically 15-minute windows, some are 24-hour
Different by Tier Free, Basic, Pro, and Enterprise have different limits
Common Rate Limits
Read Operations (GET)
Endpoint Free Tier Basic Tier Notes GET /2/tweets/:id300/15min 900/15min Per user GET /2/tweets/search/recent180/15min 450/15min Per app GET /2/users/:id300/15min 900/15min Per user GET /2/users/:id/following15/15min 75/15min Per user GET /2/dm_events300/15min 900/15min Per user
Write Operations (POST/DELETE)
Endpoint Free Tier Basic Tier Notes POST /2/tweets50/24hr 100/24hr Per user DELETE /2/tweets/:id50/24hr 100/24hr Per user POST /2/users/:id/following50/24hr 1000/24hr Per user POST /2/dm_conversations200/24hr 1000/24hr Per user
Rate limits vary by access tier and endpoint. Check the X API documentation for specific limits.
Detecting Rate Limits
Rate limit information is included in response headers:
import requests
response = client.posts.get_by_id( id = "1234567890" )
# Access rate limit headers
if hasattr (response, '__response__' ):
headers = response.__response__.headers
limit = headers.get( 'x-rate-limit-limit' ) # Max requests
remaining = headers.get( 'x-rate-limit-remaining' ) # Remaining requests
reset = headers.get( 'x-rate-limit-reset' ) # Reset timestamp
print ( f "Limit: { limit } " )
print ( f "Remaining: { remaining } " )
print ( f "Resets at: { reset } " )
Handle 429 Errors
When rate limited, the API returns a 429 status code:
from requests.exceptions import HTTPError
import time
try :
response = client.posts.create(
body = { "text" : "Hello World!" }
)
except HTTPError as e:
if e.response.status_code == 429 :
# Get reset time from headers
reset_time = int (e.response.headers.get( 'x-rate-limit-reset' , 0 ))
wait_time = reset_time - time.time()
if wait_time > 0 :
print ( f "Rate limited. Waiting { wait_time :.0f} seconds..." )
time.sleep(wait_time + 1 ) # Add 1 second buffer
# Retry request
else :
raise
Rate Limit Strategies
1. Exponential Backoff
Implement exponential backoff for retries:
import time
import random
from requests.exceptions import HTTPError
def make_request_with_backoff ( func , max_retries = 5 ):
"""Execute request with exponential backoff"""
for attempt in range (max_retries):
try :
return func()
except HTTPError as e:
if e.response.status_code == 429 :
if attempt == max_retries - 1 :
raise # Max retries reached
# Calculate backoff: 2^attempt + jitter
wait_time = ( 2 ** attempt) + random.uniform( 0 , 1 )
print ( f "Rate limited. Retry { attempt + 1 } / { max_retries } in { wait_time :.2f} s" )
time.sleep(wait_time)
else :
raise
# Use with requests
response = make_request_with_backoff(
lambda : client.posts.create( body = { "text" : "Test" })
)
2. Request Queue with Rate Limiting
Control request rate proactively:
import time
from collections import deque
class RateLimiter :
def __init__ ( self , max_requests , time_window ):
self .max_requests = max_requests
self .time_window = time_window # in seconds
self .requests = deque()
def wait_if_needed ( self ):
"""Wait if rate limit would be exceeded"""
now = time.time()
# Remove requests outside time window
while self .requests and self .requests[ 0 ] < now - self .time_window:
self .requests.popleft()
# Check if at limit
if len ( self .requests) >= self .max_requests:
wait_time = self .time_window - (now - self .requests[ 0 ])
if wait_time > 0 :
print ( f "Rate limit reached. Waiting { wait_time :.2f} s..." )
time.sleep(wait_time)
self .requests.clear()
# Record this request
self .requests.append(now)
# Create rate limiter: 50 requests per 15 minutes
limiter = RateLimiter( max_requests = 50 , time_window = 900 )
# Use before each request
for i in range ( 100 ):
limiter.wait_if_needed()
response = client.posts.get_by_id( id = f "tweet_ { i } " )
print ( f "Processed tweet { i } " )
3. Batch Requests
Use batch endpoints to reduce request count:
# ❌ Inefficient: 100 requests
for tweet_id in tweet_ids:
tweet = client.posts.get_by_id( id = tweet_id)
process(tweet)
# ✅ Efficient: 1 request per 100 tweets
for i in range ( 0 , len (tweet_ids), 100 ):
batch = tweet_ids[i:i + 100 ]
response = client.posts.get_by_ids(
ids = batch,
tweet_fields = [ "created_at" , "public_metrics" ]
)
for tweet in response.data:
process(tweet)
Authentication and Rate Limits
OAuth 2.0 vs Bearer Token
Choose authentication based on your needs:
from xdk import Client
# Bearer Token: App-level rate limits
client_app = Client( bearer_token = "YOUR_BEARER_TOKEN" )
# OAuth 2.0: Per-user rate limits (typically higher)
client_user = Client(
access_token = "USER_ACCESS_TOKEN" ,
# Often has higher limits for user-specific operations
)
# Use appropriate auth based on operation
# Read-only: Bearer token is simpler
tweets = client_app.posts.search_recent( query = "python" )
# Write operations: OAuth required
post = client_user.posts.create( body = { "text" : "Hello!" })
Rate Limit Differences:
Operation Bearer Token OAuth 2.0 User Context Read public data Lower limits Higher limits Write posts Not allowed Allowed with limits Access user data Limited Full access Multiple users App-wide limit Per-user limits
Control Request Rate
import time
# Add delays between pages
for page in client.posts.search_recent(
query = "python" ,
max_results = 100
):
for tweet in page.data:
process_tweet(tweet)
# Wait 1 second between pages
time.sleep( 1 )
Limit Total Requests
# Only fetch first N pages
max_pages = 10
page_count = 0
for page in client.posts.search_recent( query = "machine learning" ):
process_page(page)
page_count += 1
if page_count >= max_pages:
break
Optimize Field Selection
Request only the fields you need:
# ❌ Requesting all fields (slower, counts against limits)
response = client.posts.search_recent(
query = "python" ,
tweet_fields = [ "created_at" , "author_id" , "entities" , "public_metrics" ,
"referenced_tweets" , "conversation_id" , "reply_settings" ],
expansions = [ "author_id" , "referenced_tweets.id" , "attachments.media_keys" ],
user_fields = [ "username" , "verified" , "description" , "profile_image_url" ],
media_fields = [ "url" , "preview_image_url" , "type" , "width" , "height" ]
)
# ✅ Request only needed fields (faster, more efficient)
response = client.posts.search_recent(
query = "python" ,
tweet_fields = [ "created_at" , "public_metrics" ],
expansions = [ "author_id" ],
user_fields = [ "username" ]
)
Caching Strategies
Cache responses to reduce API calls:
import json
import time
from pathlib import Path
class CachedClient :
def __init__ ( self , client , cache_dir = ".cache" , ttl = 3600 ):
self .client = client
self .cache_dir = Path(cache_dir)
self .cache_dir.mkdir( exist_ok = True )
self .ttl = ttl # Time to live in seconds
def get_user_cached ( self , user_id ):
"""Get user with caching"""
cache_file = self .cache_dir / f "user_ { user_id } .json"
# Check cache
if cache_file.exists():
cache_age = time.time() - cache_file.stat().st_mtime
if cache_age < self .ttl:
with open (cache_file) as f:
return json.load(f)
# Fetch from API
response = self .client.users.get_by_id(
id = user_id,
user_fields = [ "username" , "verified" , "public_metrics" ]
)
# Cache response
with open (cache_file, 'w' ) as f:
json.dump(response.data.model_dump(), f)
return response.data
# Use cached client
cached = CachedClient(client, ttl = 3600 ) # 1 hour cache
# First call hits API
user = cached.get_user_cached( "123456789" )
# Subsequent calls use cache (within 1 hour)
user = cached.get_user_cached( "123456789" ) # From cache
Monitoring Usage
Track API Calls
import logging
from datetime import datetime
class UsageTracker :
def __init__ ( self ):
self .calls = {}
self .logger = logging.getLogger( __name__ )
def track_call ( self , endpoint ):
"""Track API call"""
if endpoint not in self .calls:
self .calls[endpoint] = []
self .calls[endpoint].append(datetime.now())
def get_stats ( self , minutes = 15 ):
"""Get call statistics for time window"""
cutoff = datetime.now().timestamp() - (minutes * 60 )
stats = {}
for endpoint, calls in self .calls.items():
recent = [c for c in calls if c.timestamp() > cutoff]
stats[endpoint] = len (recent)
return stats
def print_stats ( self ):
"""Print usage statistics"""
stats = self .get_stats( 15 )
print ( " \n === API Usage (Last 15 min) ===" )
for endpoint, count in stats.items():
print ( f " { endpoint } : { count } calls" )
# Use tracker
tracker = UsageTracker()
# Track each call
tracker.track_call( "posts.search_recent" )
response = client.posts.search_recent( query = "test" )
tracker.track_call( "users.get_by_id" )
user = client.users.get_by_id( id = "123" )
# Print stats
tracker.print_stats()
Error Recovery Patterns
Comprehensive Error Handling
import time
from requests.exceptions import HTTPError, ConnectionError , Timeout
def robust_api_call ( func , max_retries = 3 ):
"""Make API call with comprehensive error handling"""
for attempt in range (max_retries):
try :
return func()
except HTTPError as e:
status = e.response.status_code
if status == 429 :
# Rate limited - wait and retry
reset = int (e.response.headers.get( 'x-rate-limit-reset' , 0 ))
wait = max (reset - time.time(), 0 ) + 1
print ( f "Rate limited. Waiting { wait :.0f} s..." )
time.sleep(wait)
elif status in ( 500 , 502 , 503 , 504 ):
# Server error - retry with backoff
wait = 2 ** attempt
print ( f "Server error { status } . Retrying in { wait } s..." )
time.sleep(wait)
elif status in ( 401 , 403 ):
# Auth error - don't retry
print ( f "Authentication failed: { e.response.text } " )
raise
else :
# Other error - don't retry
print ( f "Client error { status } : { e.response.text } " )
raise
except ( ConnectionError , Timeout) as e:
# Network error - retry
if attempt < max_retries - 1 :
wait = 2 ** attempt
print ( f "Network error. Retrying in { wait } s..." )
time.sleep(wait)
else :
raise
raise Exception ( f "Max retries ( { max_retries } ) exceeded" )
# Use robust caller
response = robust_api_call(
lambda : client.posts.create( body = { "text" : "Test post" })
)
Best Practices Summary
Monitor rate limits
Track usage and implement monitoring: # Check remaining calls before critical operations
if remaining < 10 :
print ( "Low on API calls, slowing down..." )
time.sleep( 60 )
Use batch operations
Fetch multiple items in single requests: # Batch instead of individual requests
response = client.posts.get_by_ids( ids = batch)
Implement caching
Cache data that doesn’t change frequently: # Cache user profiles for 1 hour
cached_users = {}
Add delays
Pace your requests to stay within limits: # Small delay between requests
time.sleep( 0.1 )
Handle errors gracefully
Always catch and handle rate limit errors: try :
response = make_request()
except HTTPError as e:
if e.response.status_code == 429 :
handle_rate_limit(e)
For production applications, implement comprehensive monitoring, caching, and rate limiting strategies from the start. This prevents issues as your usage scales.