Skip to main content

Overview

The XDK Python SDK provides robust streaming capabilities with automatic reconnection, exponential backoff, and comprehensive error handling. The streaming module handles all connection management so you can focus on processing data.

Stream Features

Automatic Reconnection

Automatically reconnects on connection drops with exponential backoff

Error Classification

Distinguishes between retryable and fatal errors

Lifecycle Callbacks

Monitor connection state with event callbacks

Rate Limit Handling

Intelligent backoff for rate limit errors

Basic Streaming

Stream Filtered Posts

from xdk import Client

client = Client(bearer_token="YOUR_BEARER_TOKEN")

# Stream posts matching filter rules
for tweet in client.stream.search_stream():
    print(f"@{tweet.data.author_id}: {tweet.data.text}")

Stream Sample Posts

Get a random sample of all public posts:
# Stream random sample of posts (1% of all posts)
for tweet in client.stream.sample_stream(
    tweet_fields=["created_at", "author_id", "public_metrics"],
    expansions=["author_id"],
    user_fields=["username", "verified"]
):
    print(f"{tweet.data.created_at}: {tweet.data.text}")

Stream Configuration

Configure Retry Behavior

streaming.py:97-128
from xdk.streaming import StreamConfig

# Custom stream configuration
config = StreamConfig(
    max_retries=5,              # Max reconnection attempts (-1 for infinite)
    initial_backoff=2.0,        # Start with 2 second delay
    max_backoff=120.0,          # Cap backoff at 2 minutes
    backoff_multiplier=2.0,     # Double delay each retry
    jitter=True,                # Add random jitter to backoff
    timeout=90.0,               # 90 second request timeout
    chunk_size=2048             # Read 2KB chunks
)

# Use custom config
for tweet in client.stream.search_stream(stream_config=config):
    process_tweet(tweet)
Configuration Options:
ParameterDefaultDescription
max_retries10Maximum reconnection attempts. Use -1 for infinite retries
initial_backoff1.0Initial retry delay in seconds
max_backoff64.0Maximum retry delay in seconds
backoff_multiplier2.0Backoff multiplier for exponential increase
jitterTrueAdd random jitter (0-25%) to backoff times
timeoutNoneRequest timeout in seconds
chunk_size1024Chunk size for reading stream

Lifecycle Callbacks

Monitor stream connection state with callbacks:
streaming.py:123-127
def on_connect():
    print("Stream connected!")

def on_disconnect(exception):
    if exception:
        print(f"Disconnected due to: {exception}")
    else:
        print("Stream ended normally")

def on_reconnect(attempt, delay):
    print(f"Reconnecting (attempt {attempt}) in {delay:.2f}s...")

def on_error(error):
    print(f"Error occurred: {error.error_type} - {error}")

# Configure callbacks
config = StreamConfig(
    on_connect=on_connect,
    on_disconnect=on_disconnect,
    on_reconnect=on_reconnect,
    on_error=on_error
)

for tweet in client.stream.search_stream(stream_config=config):
    process_tweet(tweet)

Error Handling

Error Types

The SDK classifies errors into retryable and non-retryable categories:
streaming.py:49-62
from xdk.streaming import StreamError, StreamErrorType

try:
    for tweet in client.stream.search_stream():
        process_tweet(tweet)
        
except StreamError as e:
    if e.error_type == StreamErrorType.RATE_LIMITED:
        print(f"Rate limited. Retry after backoff.")
    elif e.error_type == StreamErrorType.AUTHENTICATION_ERROR:
        print(f"Auth failed: {e}")
    elif e.error_type == StreamErrorType.CONNECTION_ERROR:
        print(f"Connection lost: {e}")
    else:
        print(f"Stream error: {e}")
Error Types:
Error TypeRetryableDescription
CONNECTION_ERRORYesNetwork connection issues
TIMEOUTYesRequest timeout
SERVER_ERRORYes5xx server errors
RATE_LIMITEDYes429 rate limit errors
STREAM_INTERRUPTEDYesStream disconnected mid-transfer
AUTHENTICATION_ERRORNo401/403 auth failures
CLIENT_ERRORNoOther 4xx client errors
FATAL_ERRORNoUnrecoverable errors

Retry Logic

streaming.py:220-234
# The SDK automatically handles retries with exponential backoff:
# 
# Attempt 1: Wait 1.0s (initial_backoff)
# Attempt 2: Wait 2.0s (1.0 * 2.0)
# Attempt 3: Wait 4.0s (2.0 * 2.0)
# Attempt 4: Wait 8.0s (4.0 * 2.0)
# Attempt 5: Wait 16.0s (8.0 * 2.0)
# ...
# Capped at max_backoff (64.0s by default)

Advanced Stream Processing

Filter and Process Stream

import json
from datetime import datetime

class StreamProcessor:
    def __init__(self, client):
        self.client = client
        self.tweet_count = 0
        self.start_time = datetime.now()
    
    def process_stream(self):
        """Process filtered stream with stats"""
        config = StreamConfig(
            on_connect=self.on_connect,
            on_error=self.on_error,
            max_retries=-1  # Infinite retries
        )
        
        for tweet in self.client.stream.search_stream(
            stream_config=config,
            tweet_fields=["created_at", "author_id", "public_metrics", "entities"],
            expansions=["author_id"],
            user_fields=["username", "verified"]
        ):
            self.process_tweet(tweet)
    
    def process_tweet(self, tweet):
        """Process individual tweet"""
        self.tweet_count += 1
        
        # Extract data
        text = tweet.data.text
        author_id = tweet.data.author_id
        metrics = tweet.data.public_metrics
        
        # Filter high-engagement tweets
        if metrics and metrics.like_count > 100:
            print(f"High engagement tweet: {text[:50]}...")
            print(f"Likes: {metrics.like_count}")
        
        # Print stats every 100 tweets
        if self.tweet_count % 100 == 0:
            elapsed = (datetime.now() - self.start_time).seconds
            rate = self.tweet_count / elapsed if elapsed > 0 else 0
            print(f"Processed {self.tweet_count} tweets ({rate:.2f}/sec)")
    
    def on_connect(self):
        print(f"Stream connected at {datetime.now()}")
    
    def on_error(self, error):
        print(f"Stream error: {error.error_type}")

# Run processor
processor = StreamProcessor(client)
processor.process_stream()

Managing Stream Rules

Configure what posts appear in your filtered stream:
from xdk.stream.models import AddRulesRequest, Rule

# Add stream rules
rules = [
    Rule(value="python programming -is:retweet", tag="python-tweets"),
    Rule(value="from:xdevelopers", tag="x-dev-tweets"),
    Rule(value="#MachineLearning lang:en", tag="ml-tweets")
]

response = client.stream.add_rules(
    body=AddRulesRequest(add=rules)
)

for rule in response.data:
    print(f"Rule added: {rule.tag} - {rule.value}")

# Get current rules
response = client.stream.get_rules()
for rule in response.data:
    print(f"ID: {rule.id}, Tag: {rule.tag}, Value: {rule.value}")

# Delete rules
from xdk.stream.models import DeleteRulesRequest

response = client.stream.delete_rules(
    body=DeleteRulesRequest(
        delete={"ids": ["rule_id_1", "rule_id_2"]}
    )
)

Stream Monitoring

Track Stream Health

import time
from collections import deque

class StreamMonitor:
    def __init__(self, client):
        self.client = client
        self.recent_rates = deque(maxlen=10)  # Last 10 minute averages
        self.error_count = 0
        self.reconnect_count = 0
    
    def monitor_stream(self):
        config = StreamConfig(
            on_reconnect=self.on_reconnect,
            on_error=self.on_error,
            max_retries=-1
        )
        
        tweet_count = 0
        start_time = time.time()
        
        for tweet in self.client.stream.search_stream(stream_config=config):
            tweet_count += 1
            
            # Calculate rate every minute
            elapsed = time.time() - start_time
            if elapsed >= 60:
                rate = tweet_count / elapsed
                self.recent_rates.append(rate)
                
                # Alert on significant drops
                if len(self.recent_rates) >= 3:
                    avg_rate = sum(self.recent_rates) / len(self.recent_rates)
                    if rate < avg_rate * 0.5:
                        print(f"⚠️ Stream rate dropped: {rate:.2f} tweets/sec")
                
                # Reset counters
                tweet_count = 0
                start_time = time.time()
    
    def on_reconnect(self, attempt, delay):
        self.reconnect_count += 1
        print(f"Reconnection #{self.reconnect_count} (attempt {attempt})")
    
    def on_error(self, error):
        self.error_count += 1
        print(f"Error #{self.error_count}: {error.error_type}")

# Monitor stream
monitor = StreamMonitor(client)
monitor.monitor_stream()

Best Practices

1

Use infinite retries for production

Set max_retries=-1 for long-running streams:
config = StreamConfig(max_retries=-1)
2

Implement graceful shutdown

Handle interrupts cleanly:
import signal
import sys

def signal_handler(sig, frame):
    print("\nShutting down stream...")
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

for tweet in client.stream.search_stream():
    process_tweet(tweet)
3

Log stream events

Use callbacks to log connection events:
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

config = StreamConfig(
    on_connect=lambda: logger.info("Connected"),
    on_disconnect=lambda e: logger.warning(f"Disconnected: {e}"),
    on_reconnect=lambda a, d: logger.info(f"Reconnecting in {d}s")
)
4

Buffer data for reliability

Queue tweets for processing to prevent blocking the stream:
from queue import Queue
from threading import Thread

tweet_queue = Queue(maxsize=1000)

def stream_worker():
    for tweet in client.stream.search_stream():
        tweet_queue.put(tweet)

def process_worker():
    while True:
        tweet = tweet_queue.get()
        process_tweet(tweet)
        tweet_queue.task_done()

# Start workers
Thread(target=stream_worker, daemon=True).start()
Thread(target=process_worker, daemon=True).start()
For production systems, always use lifecycle callbacks to monitor stream health and implement alerting for prolonged disconnections.

Build docs developers (and LLMs) love