Skip to main content

Understanding Rate Limits

The X API enforces rate limits to ensure fair usage and platform stability. Different endpoints have different limits based on authentication method and access tier.

Rate Limit Structure

Per-User Limits

Limits are per authenticated user for OAuth endpoints

Per-App Limits

App-level limits for bearer token requests

Time Windows

Typically 15-minute windows, some are 24-hour

Different by Tier

Free, Basic, Pro, and Enterprise have different limits

Common Rate Limits

Read Operations (GET)

EndpointFree TierBasic TierNotes
GET /2/tweets/:id300/15min900/15minPer user
GET /2/tweets/search/recent180/15min450/15minPer app
GET /2/users/:id300/15min900/15minPer user
GET /2/users/:id/following15/15min75/15minPer user
GET /2/dm_events300/15min900/15minPer user

Write Operations (POST/DELETE)

EndpointFree TierBasic TierNotes
POST /2/tweets50/24hr100/24hrPer user
DELETE /2/tweets/:id50/24hr100/24hrPer user
POST /2/users/:id/following50/24hr1000/24hrPer user
POST /2/dm_conversations200/24hr1000/24hrPer user
Rate limits vary by access tier and endpoint. Check the X API documentation for specific limits.

Detecting Rate Limits

Response Headers

Rate limit information is included in response headers:
import requests

response = client.posts.get_by_id(id="1234567890")

# Access rate limit headers
if hasattr(response, '__response__'):
    headers = response.__response__.headers
    
    limit = headers.get('x-rate-limit-limit')          # Max requests
    remaining = headers.get('x-rate-limit-remaining')  # Remaining requests
    reset = headers.get('x-rate-limit-reset')          # Reset timestamp
    
    print(f"Limit: {limit}")
    print(f"Remaining: {remaining}")
    print(f"Resets at: {reset}")

Handle 429 Errors

When rate limited, the API returns a 429 status code:
from requests.exceptions import HTTPError
import time

try:
    response = client.posts.create(
        body={"text": "Hello World!"}
    )
except HTTPError as e:
    if e.response.status_code == 429:
        # Get reset time from headers
        reset_time = int(e.response.headers.get('x-rate-limit-reset', 0))
        wait_time = reset_time - time.time()
        
        if wait_time > 0:
            print(f"Rate limited. Waiting {wait_time:.0f} seconds...")
            time.sleep(wait_time + 1)  # Add 1 second buffer
            # Retry request
    else:
        raise

Rate Limit Strategies

1. Exponential Backoff

Implement exponential backoff for retries:
import time
import random
from requests.exceptions import HTTPError

def make_request_with_backoff(func, max_retries=5):
    """Execute request with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return func()
        except HTTPError as e:
            if e.response.status_code == 429:
                if attempt == max_retries - 1:
                    raise  # Max retries reached
                
                # Calculate backoff: 2^attempt + jitter
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"Rate limited. Retry {attempt + 1}/{max_retries} in {wait_time:.2f}s")
                time.sleep(wait_time)
            else:
                raise

# Use with requests
response = make_request_with_backoff(
    lambda: client.posts.create(body={"text": "Test"})
)

2. Request Queue with Rate Limiting

Control request rate proactively:
import time
from collections import deque

class RateLimiter:
    def __init__(self, max_requests, time_window):
        self.max_requests = max_requests
        self.time_window = time_window  # in seconds
        self.requests = deque()
    
    def wait_if_needed(self):
        """Wait if rate limit would be exceeded"""
        now = time.time()
        
        # Remove requests outside time window
        while self.requests and self.requests[0] < now - self.time_window:
            self.requests.popleft()
        
        # Check if at limit
        if len(self.requests) >= self.max_requests:
            wait_time = self.time_window - (now - self.requests[0])
            if wait_time > 0:
                print(f"Rate limit reached. Waiting {wait_time:.2f}s...")
                time.sleep(wait_time)
                self.requests.clear()
        
        # Record this request
        self.requests.append(now)

# Create rate limiter: 50 requests per 15 minutes
limiter = RateLimiter(max_requests=50, time_window=900)

# Use before each request
for i in range(100):
    limiter.wait_if_needed()
    response = client.posts.get_by_id(id=f"tweet_{i}")
    print(f"Processed tweet {i}")

3. Batch Requests

Use batch endpoints to reduce request count:
# ❌ Inefficient: 100 requests
for tweet_id in tweet_ids:
    tweet = client.posts.get_by_id(id=tweet_id)
    process(tweet)

# ✅ Efficient: 1 request per 100 tweets
for i in range(0, len(tweet_ids), 100):
    batch = tweet_ids[i:i+100]
    response = client.posts.get_by_ids(
        ids=batch,
        tweet_fields=["created_at", "public_metrics"]
    )
    for tweet in response.data:
        process(tweet)

Authentication and Rate Limits

OAuth 2.0 vs Bearer Token

Choose authentication based on your needs:
streaming.py:84-94
from xdk import Client

# Bearer Token: App-level rate limits
client_app = Client(bearer_token="YOUR_BEARER_TOKEN")

# OAuth 2.0: Per-user rate limits (typically higher)
client_user = Client(
    access_token="USER_ACCESS_TOKEN",
    # Often has higher limits for user-specific operations
)

# Use appropriate auth based on operation
# Read-only: Bearer token is simpler
tweets = client_app.posts.search_recent(query="python")

# Write operations: OAuth required
post = client_user.posts.create(body={"text": "Hello!"})
Rate Limit Differences:
OperationBearer TokenOAuth 2.0 User Context
Read public dataLower limitsHigher limits
Write postsNot allowedAllowed with limits
Access user dataLimitedFull access
Multiple usersApp-wide limitPer-user limits

Pagination Best Practices

Control Request Rate

import time

# Add delays between pages
for page in client.posts.search_recent(
    query="python",
    max_results=100
):
    for tweet in page.data:
        process_tweet(tweet)
    
    # Wait 1 second between pages
    time.sleep(1)

Limit Total Requests

# Only fetch first N pages
max_pages = 10
page_count = 0

for page in client.posts.search_recent(query="machine learning"):
    process_page(page)
    
    page_count += 1
    if page_count >= max_pages:
        break

Optimize Field Selection

Request only the fields you need:
# ❌ Requesting all fields (slower, counts against limits)
response = client.posts.search_recent(
    query="python",
    tweet_fields=["created_at", "author_id", "entities", "public_metrics", 
                  "referenced_tweets", "conversation_id", "reply_settings"],
    expansions=["author_id", "referenced_tweets.id", "attachments.media_keys"],
    user_fields=["username", "verified", "description", "profile_image_url"],
    media_fields=["url", "preview_image_url", "type", "width", "height"]
)

# ✅ Request only needed fields (faster, more efficient)
response = client.posts.search_recent(
    query="python",
    tweet_fields=["created_at", "public_metrics"],
    expansions=["author_id"],
    user_fields=["username"]
)

Caching Strategies

Cache responses to reduce API calls:
import json
import time
from pathlib import Path

class CachedClient:
    def __init__(self, client, cache_dir=".cache", ttl=3600):
        self.client = client
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.ttl = ttl  # Time to live in seconds
    
    def get_user_cached(self, user_id):
        """Get user with caching"""
        cache_file = self.cache_dir / f"user_{user_id}.json"
        
        # Check cache
        if cache_file.exists():
            cache_age = time.time() - cache_file.stat().st_mtime
            if cache_age < self.ttl:
                with open(cache_file) as f:
                    return json.load(f)
        
        # Fetch from API
        response = self.client.users.get_by_id(
            id=user_id,
            user_fields=["username", "verified", "public_metrics"]
        )
        
        # Cache response
        with open(cache_file, 'w') as f:
            json.dump(response.data.model_dump(), f)
        
        return response.data

# Use cached client
cached = CachedClient(client, ttl=3600)  # 1 hour cache

# First call hits API
user = cached.get_user_cached("123456789")

# Subsequent calls use cache (within 1 hour)
user = cached.get_user_cached("123456789")  # From cache

Monitoring Usage

Track API Calls

import logging
from datetime import datetime

class UsageTracker:
    def __init__(self):
        self.calls = {}
        self.logger = logging.getLogger(__name__)
    
    def track_call(self, endpoint):
        """Track API call"""
        if endpoint not in self.calls:
            self.calls[endpoint] = []
        
        self.calls[endpoint].append(datetime.now())
    
    def get_stats(self, minutes=15):
        """Get call statistics for time window"""
        cutoff = datetime.now().timestamp() - (minutes * 60)
        
        stats = {}
        for endpoint, calls in self.calls.items():
            recent = [c for c in calls if c.timestamp() > cutoff]
            stats[endpoint] = len(recent)
        
        return stats
    
    def print_stats(self):
        """Print usage statistics"""
        stats = self.get_stats(15)
        print("\n=== API Usage (Last 15 min) ===")
        for endpoint, count in stats.items():
            print(f"{endpoint}: {count} calls")

# Use tracker
tracker = UsageTracker()

# Track each call
tracker.track_call("posts.search_recent")
response = client.posts.search_recent(query="test")

tracker.track_call("users.get_by_id")
user = client.users.get_by_id(id="123")

# Print stats
tracker.print_stats()

Error Recovery Patterns

Comprehensive Error Handling

import time
from requests.exceptions import HTTPError, ConnectionError, Timeout

def robust_api_call(func, max_retries=3):
    """Make API call with comprehensive error handling"""
    for attempt in range(max_retries):
        try:
            return func()
            
        except HTTPError as e:
            status = e.response.status_code
            
            if status == 429:
                # Rate limited - wait and retry
                reset = int(e.response.headers.get('x-rate-limit-reset', 0))
                wait = max(reset - time.time(), 0) + 1
                print(f"Rate limited. Waiting {wait:.0f}s...")
                time.sleep(wait)
                
            elif status in (500, 502, 503, 504):
                # Server error - retry with backoff
                wait = 2 ** attempt
                print(f"Server error {status}. Retrying in {wait}s...")
                time.sleep(wait)
                
            elif status in (401, 403):
                # Auth error - don't retry
                print(f"Authentication failed: {e.response.text}")
                raise
                
            else:
                # Other error - don't retry
                print(f"Client error {status}: {e.response.text}")
                raise
                
        except (ConnectionError, Timeout) as e:
            # Network error - retry
            if attempt < max_retries - 1:
                wait = 2 ** attempt
                print(f"Network error. Retrying in {wait}s...")
                time.sleep(wait)
            else:
                raise
    
    raise Exception(f"Max retries ({max_retries}) exceeded")

# Use robust caller
response = robust_api_call(
    lambda: client.posts.create(body={"text": "Test post"})
)

Best Practices Summary

1

Monitor rate limits

Track usage and implement monitoring:
# Check remaining calls before critical operations
if remaining < 10:
    print("Low on API calls, slowing down...")
    time.sleep(60)
2

Use batch operations

Fetch multiple items in single requests:
# Batch instead of individual requests
response = client.posts.get_by_ids(ids=batch)
3

Implement caching

Cache data that doesn’t change frequently:
# Cache user profiles for 1 hour
cached_users = {}
4

Add delays

Pace your requests to stay within limits:
# Small delay between requests
time.sleep(0.1)
5

Handle errors gracefully

Always catch and handle rate limit errors:
try:
    response = make_request()
except HTTPError as e:
    if e.response.status_code == 429:
        handle_rate_limit(e)
For production applications, implement comprehensive monitoring, caching, and rate limiting strategies from the start. This prevents issues as your usage scales.

Build docs developers (and LLMs) love