Overview
The XDK Python SDK provides robust streaming capabilities with automatic reconnection, exponential backoff, and comprehensive error handling. The streaming module handles all connection management so you can focus on processing data.
Stream Features
Automatic Reconnection Automatically reconnects on connection drops with exponential backoff
Error Classification Distinguishes between retryable and fatal errors
Lifecycle Callbacks Monitor connection state with event callbacks
Rate Limit Handling Intelligent backoff for rate limit errors
Basic Streaming
Stream Filtered Posts
from xdk import Client
client = Client( bearer_token = "YOUR_BEARER_TOKEN" )
# Stream posts matching filter rules
for tweet in client.stream.search_stream():
print ( f "@ { tweet.data.author_id } : { tweet.data.text } " )
Stream Sample Posts
Get a random sample of all public posts:
# Stream random sample of posts (1% of all posts)
for tweet in client.stream.sample_stream(
tweet_fields = [ "created_at" , "author_id" , "public_metrics" ],
expansions = [ "author_id" ],
user_fields = [ "username" , "verified" ]
):
print ( f " { tweet.data.created_at } : { tweet.data.text } " )
Stream Configuration
from xdk.streaming import StreamConfig
# Custom stream configuration
config = StreamConfig(
max_retries = 5 , # Max reconnection attempts (-1 for infinite)
initial_backoff = 2.0 , # Start with 2 second delay
max_backoff = 120.0 , # Cap backoff at 2 minutes
backoff_multiplier = 2.0 , # Double delay each retry
jitter = True , # Add random jitter to backoff
timeout = 90.0 , # 90 second request timeout
chunk_size = 2048 # Read 2KB chunks
)
# Use custom config
for tweet in client.stream.search_stream( stream_config = config):
process_tweet(tweet)
Configuration Options:
Parameter Default Description max_retries10 Maximum reconnection attempts. Use -1 for infinite retries initial_backoff1.0 Initial retry delay in seconds max_backoff64.0 Maximum retry delay in seconds backoff_multiplier2.0 Backoff multiplier for exponential increase jitterTrue Add random jitter (0-25%) to backoff times timeoutNone Request timeout in seconds chunk_size1024 Chunk size for reading stream
Lifecycle Callbacks
Monitor stream connection state with callbacks:
def on_connect ():
print ( "Stream connected!" )
def on_disconnect ( exception ):
if exception:
print ( f "Disconnected due to: { exception } " )
else :
print ( "Stream ended normally" )
def on_reconnect ( attempt , delay ):
print ( f "Reconnecting (attempt { attempt } ) in { delay :.2f} s..." )
def on_error ( error ):
print ( f "Error occurred: { error.error_type } - { error } " )
# Configure callbacks
config = StreamConfig(
on_connect = on_connect,
on_disconnect = on_disconnect,
on_reconnect = on_reconnect,
on_error = on_error
)
for tweet in client.stream.search_stream( stream_config = config):
process_tweet(tweet)
Error Handling
Error Types
The SDK classifies errors into retryable and non-retryable categories:
from xdk.streaming import StreamError, StreamErrorType
try :
for tweet in client.stream.search_stream():
process_tweet(tweet)
except StreamError as e:
if e.error_type == StreamErrorType. RATE_LIMITED :
print ( f "Rate limited. Retry after backoff." )
elif e.error_type == StreamErrorType. AUTHENTICATION_ERROR :
print ( f "Auth failed: { e } " )
elif e.error_type == StreamErrorType. CONNECTION_ERROR :
print ( f "Connection lost: { e } " )
else :
print ( f "Stream error: { e } " )
Error Types:
Error Type Retryable Description CONNECTION_ERRORYes Network connection issues TIMEOUTYes Request timeout SERVER_ERRORYes 5xx server errors RATE_LIMITEDYes 429 rate limit errors STREAM_INTERRUPTEDYes Stream disconnected mid-transfer AUTHENTICATION_ERRORNo 401/403 auth failures CLIENT_ERRORNo Other 4xx client errors FATAL_ERRORNo Unrecoverable errors
Retry Logic
# The SDK automatically handles retries with exponential backoff:
#
# Attempt 1: Wait 1.0s (initial_backoff)
# Attempt 2: Wait 2.0s (1.0 * 2.0)
# Attempt 3: Wait 4.0s (2.0 * 2.0)
# Attempt 4: Wait 8.0s (4.0 * 2.0)
# Attempt 5: Wait 16.0s (8.0 * 2.0)
# ...
# Capped at max_backoff (64.0s by default)
Advanced Stream Processing
Filter and Process Stream
import json
from datetime import datetime
class StreamProcessor :
def __init__ ( self , client ):
self .client = client
self .tweet_count = 0
self .start_time = datetime.now()
def process_stream ( self ):
"""Process filtered stream with stats"""
config = StreamConfig(
on_connect = self .on_connect,
on_error = self .on_error,
max_retries =- 1 # Infinite retries
)
for tweet in self .client.stream.search_stream(
stream_config = config,
tweet_fields = [ "created_at" , "author_id" , "public_metrics" , "entities" ],
expansions = [ "author_id" ],
user_fields = [ "username" , "verified" ]
):
self .process_tweet(tweet)
def process_tweet ( self , tweet ):
"""Process individual tweet"""
self .tweet_count += 1
# Extract data
text = tweet.data.text
author_id = tweet.data.author_id
metrics = tweet.data.public_metrics
# Filter high-engagement tweets
if metrics and metrics.like_count > 100 :
print ( f "High engagement tweet: { text[: 50 ] } ..." )
print ( f "Likes: { metrics.like_count } " )
# Print stats every 100 tweets
if self .tweet_count % 100 == 0 :
elapsed = (datetime.now() - self .start_time).seconds
rate = self .tweet_count / elapsed if elapsed > 0 else 0
print ( f "Processed { self .tweet_count } tweets ( { rate :.2f} /sec)" )
def on_connect ( self ):
print ( f "Stream connected at { datetime.now() } " )
def on_error ( self , error ):
print ( f "Stream error: { error.error_type } " )
# Run processor
processor = StreamProcessor(client)
processor.process_stream()
Managing Stream Rules
Configure what posts appear in your filtered stream:
from xdk.stream.models import AddRulesRequest, Rule
# Add stream rules
rules = [
Rule( value = "python programming -is:retweet" , tag = "python-tweets" ),
Rule( value = "from:xdevelopers" , tag = "x-dev-tweets" ),
Rule( value = "#MachineLearning lang:en" , tag = "ml-tweets" )
]
response = client.stream.add_rules(
body = AddRulesRequest( add = rules)
)
for rule in response.data:
print ( f "Rule added: { rule.tag } - { rule.value } " )
# Get current rules
response = client.stream.get_rules()
for rule in response.data:
print ( f "ID: { rule.id } , Tag: { rule.tag } , Value: { rule.value } " )
# Delete rules
from xdk.stream.models import DeleteRulesRequest
response = client.stream.delete_rules(
body = DeleteRulesRequest(
delete = { "ids" : [ "rule_id_1" , "rule_id_2" ]}
)
)
Stream Monitoring
Track Stream Health
import time
from collections import deque
class StreamMonitor :
def __init__ ( self , client ):
self .client = client
self .recent_rates = deque( maxlen = 10 ) # Last 10 minute averages
self .error_count = 0
self .reconnect_count = 0
def monitor_stream ( self ):
config = StreamConfig(
on_reconnect = self .on_reconnect,
on_error = self .on_error,
max_retries =- 1
)
tweet_count = 0
start_time = time.time()
for tweet in self .client.stream.search_stream( stream_config = config):
tweet_count += 1
# Calculate rate every minute
elapsed = time.time() - start_time
if elapsed >= 60 :
rate = tweet_count / elapsed
self .recent_rates.append(rate)
# Alert on significant drops
if len ( self .recent_rates) >= 3 :
avg_rate = sum ( self .recent_rates) / len ( self .recent_rates)
if rate < avg_rate * 0.5 :
print ( f "⚠️ Stream rate dropped: { rate :.2f} tweets/sec" )
# Reset counters
tweet_count = 0
start_time = time.time()
def on_reconnect ( self , attempt , delay ):
self .reconnect_count += 1
print ( f "Reconnection # { self .reconnect_count } (attempt { attempt } )" )
def on_error ( self , error ):
self .error_count += 1
print ( f "Error # { self .error_count } : { error.error_type } " )
# Monitor stream
monitor = StreamMonitor(client)
monitor.monitor_stream()
Best Practices
Use infinite retries for production
Set max_retries=-1 for long-running streams: config = StreamConfig( max_retries =- 1 )
Implement graceful shutdown
Handle interrupts cleanly: import signal
import sys
def signal_handler ( sig , frame ):
print ( " \n Shutting down stream..." )
sys.exit( 0 )
signal.signal(signal. SIGINT , signal_handler)
for tweet in client.stream.search_stream():
process_tweet(tweet)
Log stream events
Use callbacks to log connection events: import logging
logging.basicConfig( level = logging. INFO )
logger = logging.getLogger( __name__ )
config = StreamConfig(
on_connect = lambda : logger.info( "Connected" ),
on_disconnect = lambda e : logger.warning( f "Disconnected: { e } " ),
on_reconnect = lambda a , d : logger.info( f "Reconnecting in { d } s" )
)
Buffer data for reliability
Queue tweets for processing to prevent blocking the stream: from queue import Queue
from threading import Thread
tweet_queue = Queue( maxsize = 1000 )
def stream_worker ():
for tweet in client.stream.search_stream():
tweet_queue.put(tweet)
def process_worker ():
while True :
tweet = tweet_queue.get()
process_tweet(tweet)
tweet_queue.task_done()
# Start workers
Thread( target = stream_worker, daemon = True ).start()
Thread( target = process_worker, daemon = True ).start()
For production systems, always use lifecycle callbacks to monitor stream health and implement alerting for prolonged disconnections.