Overview
The XDK provides robust streaming capabilities with automatic reconnection, exponential backoff, and comprehensive error handling. The streaming utilities handle all connection management automatically, so you can focus on processing events.
StreamConfig
Configure streaming behavior with the StreamConfig dataclass:
from xdk.streaming import StreamConfig
config = StreamConfig(
max_retries = 10 , # Maximum reconnection attempts (-1 for infinite)
initial_backoff = 1.0 , # Initial backoff delay in seconds
max_backoff = 64.0 , # Maximum backoff delay in seconds
backoff_multiplier = 2.0 , # Exponential backoff multiplier
jitter = True , # Add random jitter to backoff
timeout = None , # Request timeout (None for no timeout)
chunk_size = 1024 , # Stream chunk size in bytes
on_connect = lambda : print ( "Connected!" ),
on_disconnect = lambda exc : print ( f "Disconnected: { exc } " ),
on_reconnect = lambda attempt , delay : print ( f "Reconnecting (attempt { attempt } )..." ),
on_error = lambda err : print ( f "Error: { err } " )
)
Configuration Parameters
Maximum number of reconnection attempts. Set to -1 for infinite retries.
Initial backoff delay in seconds before first retry.
Maximum backoff delay in seconds (backoff is capped at this value).
Multiplier for exponential backoff calculation.
Whether to add random jitter (0-25%) to backoff times.
timeout
Optional[float]
default: "None"
Request timeout in seconds. None for no timeout.
Size of chunks to read from stream in bytes.
on_connect
Optional[Callable[[], None]]
Callback invoked when connection is established.
on_disconnect
Optional[Callable[[Optional[Exception]], None]]
Callback invoked when connection is lost (receives the exception if any).
on_reconnect
Optional[Callable[[int, float], None]]
Callback invoked when reconnection is attempted (receives attempt number and delay).
on_error
Optional[Callable[[StreamError], None]]
Callback invoked when an error occurs (receives StreamError instance).
StreamError
Errors during streaming are classified and wrapped in StreamError:
class StreamError ( Exception ):
def __init__ (
self ,
message : str ,
error_type : StreamErrorType,
original_exception : Optional[ Exception ] = None ,
status_code : Optional[ int ] = None ,
response_body : Optional[ str ] = None ,
)
@ property
def is_retryable ( self ) -> bool :
"""Check if this error type should be retried."""
Error Attributes
message - Human-readable error description
error_type - Classification of error (see StreamErrorType)
original_exception - The underlying exception that caused the error
status_code - HTTP status code (if applicable)
response_body - Response body text (if applicable)
is_retryable - Whether this error should trigger a retry
StreamErrorType
Errors are classified into retryable and non-retryable types:
class StreamErrorType ( Enum ):
# Retryable errors - connection issues that may resolve
CONNECTION_ERROR = "connection_error"
TIMEOUT = "timeout"
SERVER_ERROR = "server_error" # 5xx errors
RATE_LIMITED = "rate_limited" # 429 errors
STREAM_INTERRUPTED = "stream_interrupted"
# Non-retryable errors - client issues that won't resolve
AUTHENTICATION_ERROR = "authentication_error" # 401/403
CLIENT_ERROR = "client_error" # Other 4xx errors
FATAL_ERROR = "fatal_error"
Retryable vs Non-Retryable
Retryable errors trigger automatic reconnection:
Network connection errors
Timeouts
Server errors (5xx)
Rate limiting (429)
Stream interruptions
Non-retryable errors immediately raise:
Authentication failures (401, 403)
Client errors (other 4xx)
Fatal/unexpected errors
StreamState
Internal state tracking for streaming connections:
@dataclass
class StreamState :
retry_count: int = 0
current_backoff: float = 1.0
is_connected: bool = False
total_items_received: int = 0
last_error: Optional[StreamError] = None
Basic Streaming
Use the streaming methods provided by endpoint clients:
from xdk import Client
from xdk.streaming import StreamConfig
client = Client( bearer_token = "your_token" )
# Configure streaming behavior
config = StreamConfig(
max_retries = 20 ,
initial_backoff = 2.0 ,
on_connect = lambda : print ( "Stream connected" ),
on_error = lambda e : print ( f "Stream error: { e.message } " )
)
# Start streaming
for event in client.stream.sample_stream( config = config):
print ( f "Received: { event } " )
Filtered Stream Example
from xdk import Client
from xdk.streaming import StreamConfig, StreamError, StreamErrorType
client = Client( bearer_token = "your_token" )
# Add rules for filtering
client.stream.add_or_update_rules({
"add" : [
{ "value" : "python programming" , "tag" : "python" },
{ "value" : "machine learning" , "tag" : "ml" }
]
})
# Configure with lifecycle callbacks
def on_connect ():
print ( "Connected to filtered stream" )
def on_disconnect ( exc ):
if exc:
print ( f "Disconnected due to: { exc } " )
else :
print ( "Stream ended normally" )
def on_reconnect ( attempt , delay ):
print ( f "Reconnecting in { delay :.1f} s (attempt { attempt } )" )
def on_error ( error ):
print ( f "Error [ { error.error_type.value } ]: { error.message } " )
if error.status_code:
print ( f "HTTP { error.status_code } " )
config = StreamConfig(
max_retries =- 1 , # Infinite retries
on_connect = on_connect,
on_disconnect = on_disconnect,
on_reconnect = on_reconnect,
on_error = on_error
)
# Stream matching posts
try :
for post in client.stream.search_stream( config = config):
print ( f " { post.data.author_id } : { post.data.text } " )
# Access matching rules
if hasattr (post, 'matching_rules' ):
for rule in post.matching_rules:
print ( f " Matched rule: { rule.tag } " )
except StreamError as e:
if e.error_type == StreamErrorType. AUTHENTICATION_ERROR :
print ( "Authentication failed - check your credentials" )
elif e.error_type == StreamErrorType. RATE_LIMITED :
print ( "Rate limited - wait before reconnecting" )
else :
print ( f "Stream failed: { e.message } " )
Advanced Error Handling
from xdk.streaming import StreamConfig, StreamError, StreamErrorType
import logging
# Set up logging
logging.basicConfig( level = logging. INFO )
logger = logging.getLogger( __name__ )
# Track errors for monitoring
error_counts = {
StreamErrorType. CONNECTION_ERROR : 0 ,
StreamErrorType. TIMEOUT : 0 ,
StreamErrorType. RATE_LIMITED : 0 ,
}
def handle_error ( error : StreamError):
logger.error( f "Stream error: { error.message } " )
if error.error_type in error_counts:
error_counts[error.error_type] += 1
# Log additional context
if error.status_code:
logger.error( f "HTTP Status: { error.status_code } " )
if error.response_body:
logger.error( f "Response: { error.response_body[: 200 ] } " )
# Custom handling based on error type
if error.error_type == StreamErrorType. RATE_LIMITED :
logger.warning( "Rate limited - consider reducing request rate" )
elif error.error_type == StreamErrorType. AUTHENTICATION_ERROR :
logger.critical( "Auth failed - check credentials and scopes" )
# Could trigger alert/notification here
config = StreamConfig(
max_retries = 50 ,
initial_backoff = 5.0 ,
max_backoff = 300.0 , # Max 5 minutes
on_error = handle_error
)
try :
for event in client.stream.sample_stream( config = config):
process_event(event)
except StreamError as e:
logger.error( f "Fatal stream error: { e } " )
logger.info( f "Error statistics: { error_counts } " )
Exponential Backoff
The streaming system uses exponential backoff with jitter:
# Backoff calculation:
backoff = initial_backoff * (multiplier ** retry_count)
backoff = min (backoff, max_backoff)
if jitter:
jitter_amount = backoff * random.uniform( 0 , 0.25 )
backoff += jitter_amount
Example progression (initial=1.0, multiplier=2.0, max=64.0, jitter=True):
Attempt 1: ~1.0-1.25s
Attempt 2: ~2.0-2.5s
Attempt 3: ~4.0-5.0s
Attempt 4: ~8.0-10.0s
Attempt 5: ~16.0-20.0s
Attempt 6: ~32.0-40.0s
Attempt 7+: ~64.0-80.0s (capped at max_backoff)
Custom Stream Processing
from xdk.streaming import StreamConfig
import json
class StreamProcessor :
def __init__ ( self , client ):
self .client = client
self .processed_count = 0
self .error_count = 0
def on_connect ( self ):
print ( f "Stream connected at { time.time() } " )
def on_disconnect ( self , exc ):
print ( f "Disconnected after { self .processed_count } items" )
def on_error ( self , error ):
self .error_count += 1
if error.is_retryable:
print ( f "Retryable error # { self .error_count } : { error.message } " )
else :
print ( f "Fatal error: { error.message } " )
def process_stream ( self ):
config = StreamConfig(
max_retries =- 1 ,
on_connect = self .on_connect,
on_disconnect = self .on_disconnect,
on_error = self .on_error,
timeout = 90.0 # 90 second timeout
)
try :
for event in self .client.stream.sample_stream( config = config):
self .process_event(event)
self .processed_count += 1
# Periodic status
if self .processed_count % 1000 == 0 :
print ( f "Processed { self .processed_count } events" )
except KeyboardInterrupt :
print ( f " \n Stopped after { self .processed_count } events" )
except StreamError as e:
print ( f "Stream failed: { e } " )
def process_event ( self , event ):
# Your processing logic
pass
# Usage
processor = StreamProcessor(client)
processor.process_stream()
Best Practices
Configure Appropriate Retries Set max_retries=-1 for long-running streams, or use finite retries for batch processing.
Use Lifecycle Callbacks Implement callbacks for monitoring, logging, and alerting on stream health.
Handle Non-Retryable Errors Catch authentication and client errors separately - they require different handling than connection issues.
Set Reasonable Timeouts Configure timeouts based on your use case - longer for stable connections, shorter for quick failure detection.
Monitor Error Patterns Track error types and frequencies to identify infrastructure issues or API problems.