Skip to main content

Overview

The XDK provides robust streaming capabilities with automatic reconnection, exponential backoff, and comprehensive error handling. The streaming utilities handle all connection management automatically, so you can focus on processing events.

StreamConfig

Configure streaming behavior with the StreamConfig dataclass:
from xdk.streaming import StreamConfig

config = StreamConfig(
    max_retries=10,              # Maximum reconnection attempts (-1 for infinite)
    initial_backoff=1.0,         # Initial backoff delay in seconds
    max_backoff=64.0,            # Maximum backoff delay in seconds
    backoff_multiplier=2.0,      # Exponential backoff multiplier
    jitter=True,                 # Add random jitter to backoff
    timeout=None,                # Request timeout (None for no timeout)
    chunk_size=1024,             # Stream chunk size in bytes
    on_connect=lambda: print("Connected!"),
    on_disconnect=lambda exc: print(f"Disconnected: {exc}"),
    on_reconnect=lambda attempt, delay: print(f"Reconnecting (attempt {attempt})..."),
    on_error=lambda err: print(f"Error: {err}")
)

Configuration Parameters

max_retries
int
default:"10"
Maximum number of reconnection attempts. Set to -1 for infinite retries.
initial_backoff
float
default:"1.0"
Initial backoff delay in seconds before first retry.
max_backoff
float
default:"64.0"
Maximum backoff delay in seconds (backoff is capped at this value).
backoff_multiplier
float
default:"2.0"
Multiplier for exponential backoff calculation.
jitter
bool
default:"True"
Whether to add random jitter (0-25%) to backoff times.
timeout
Optional[float]
default:"None"
Request timeout in seconds. None for no timeout.
chunk_size
int
default:"1024"
Size of chunks to read from stream in bytes.
on_connect
Optional[Callable[[], None]]
Callback invoked when connection is established.
on_disconnect
Optional[Callable[[Optional[Exception]], None]]
Callback invoked when connection is lost (receives the exception if any).
on_reconnect
Optional[Callable[[int, float], None]]
Callback invoked when reconnection is attempted (receives attempt number and delay).
on_error
Optional[Callable[[StreamError], None]]
Callback invoked when an error occurs (receives StreamError instance).

StreamError

Errors during streaming are classified and wrapped in StreamError:
class StreamError(Exception):
    def __init__(
        self,
        message: str,
        error_type: StreamErrorType,
        original_exception: Optional[Exception] = None,
        status_code: Optional[int] = None,
        response_body: Optional[str] = None,
    )
    
    @property
    def is_retryable(self) -> bool:
        """Check if this error type should be retried."""

Error Attributes

  • message - Human-readable error description
  • error_type - Classification of error (see StreamErrorType)
  • original_exception - The underlying exception that caused the error
  • status_code - HTTP status code (if applicable)
  • response_body - Response body text (if applicable)
  • is_retryable - Whether this error should trigger a retry

StreamErrorType

Errors are classified into retryable and non-retryable types:
class StreamErrorType(Enum):
    # Retryable errors - connection issues that may resolve
    CONNECTION_ERROR = "connection_error"
    TIMEOUT = "timeout"
    SERVER_ERROR = "server_error"          # 5xx errors
    RATE_LIMITED = "rate_limited"          # 429 errors
    STREAM_INTERRUPTED = "stream_interrupted"
    
    # Non-retryable errors - client issues that won't resolve
    AUTHENTICATION_ERROR = "authentication_error"  # 401/403
    CLIENT_ERROR = "client_error"          # Other 4xx errors
    FATAL_ERROR = "fatal_error"

Retryable vs Non-Retryable

Retryable errors trigger automatic reconnection:
  • Network connection errors
  • Timeouts
  • Server errors (5xx)
  • Rate limiting (429)
  • Stream interruptions
Non-retryable errors immediately raise:
  • Authentication failures (401, 403)
  • Client errors (other 4xx)
  • Fatal/unexpected errors

StreamState

Internal state tracking for streaming connections:
@dataclass
class StreamState:
    retry_count: int = 0
    current_backoff: float = 1.0
    is_connected: bool = False
    total_items_received: int = 0
    last_error: Optional[StreamError] = None

Basic Streaming

Use the streaming methods provided by endpoint clients:
from xdk import Client
from xdk.streaming import StreamConfig

client = Client(bearer_token="your_token")

# Configure streaming behavior
config = StreamConfig(
    max_retries=20,
    initial_backoff=2.0,
    on_connect=lambda: print("Stream connected"),
    on_error=lambda e: print(f"Stream error: {e.message}")
)

# Start streaming
for event in client.stream.sample_stream(config=config):
    print(f"Received: {event}")

Filtered Stream Example

from xdk import Client
from xdk.streaming import StreamConfig, StreamError, StreamErrorType

client = Client(bearer_token="your_token")

# Add rules for filtering
client.stream.add_or_update_rules({
    "add": [
        {"value": "python programming", "tag": "python"},
        {"value": "machine learning", "tag": "ml"}
    ]
})

# Configure with lifecycle callbacks
def on_connect():
    print("Connected to filtered stream")

def on_disconnect(exc):
    if exc:
        print(f"Disconnected due to: {exc}")
    else:
        print("Stream ended normally")

def on_reconnect(attempt, delay):
    print(f"Reconnecting in {delay:.1f}s (attempt {attempt})")

def on_error(error):
    print(f"Error [{error.error_type.value}]: {error.message}")
    if error.status_code:
        print(f"HTTP {error.status_code}")

config = StreamConfig(
    max_retries=-1,  # Infinite retries
    on_connect=on_connect,
    on_disconnect=on_disconnect,
    on_reconnect=on_reconnect,
    on_error=on_error
)

# Stream matching posts
try:
    for post in client.stream.search_stream(config=config):
        print(f"{post.data.author_id}: {post.data.text}")
        
        # Access matching rules
        if hasattr(post, 'matching_rules'):
            for rule in post.matching_rules:
                print(f"  Matched rule: {rule.tag}")
                
except StreamError as e:
    if e.error_type == StreamErrorType.AUTHENTICATION_ERROR:
        print("Authentication failed - check your credentials")
    elif e.error_type == StreamErrorType.RATE_LIMITED:
        print("Rate limited - wait before reconnecting")
    else:
        print(f"Stream failed: {e.message}")

Advanced Error Handling

from xdk.streaming import StreamConfig, StreamError, StreamErrorType
import logging

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Track errors for monitoring
error_counts = {
    StreamErrorType.CONNECTION_ERROR: 0,
    StreamErrorType.TIMEOUT: 0,
    StreamErrorType.RATE_LIMITED: 0,
}

def handle_error(error: StreamError):
    logger.error(f"Stream error: {error.message}")
    
    if error.error_type in error_counts:
        error_counts[error.error_type] += 1
    
    # Log additional context
    if error.status_code:
        logger.error(f"HTTP Status: {error.status_code}")
    if error.response_body:
        logger.error(f"Response: {error.response_body[:200]}")
    
    # Custom handling based on error type
    if error.error_type == StreamErrorType.RATE_LIMITED:
        logger.warning("Rate limited - consider reducing request rate")
    elif error.error_type == StreamErrorType.AUTHENTICATION_ERROR:
        logger.critical("Auth failed - check credentials and scopes")
        # Could trigger alert/notification here

config = StreamConfig(
    max_retries=50,
    initial_backoff=5.0,
    max_backoff=300.0,  # Max 5 minutes
    on_error=handle_error
)

try:
    for event in client.stream.sample_stream(config=config):
        process_event(event)
except StreamError as e:
    logger.error(f"Fatal stream error: {e}")
    logger.info(f"Error statistics: {error_counts}")

Exponential Backoff

The streaming system uses exponential backoff with jitter:
# Backoff calculation:
backoff = initial_backoff * (multiplier ** retry_count)
backoff = min(backoff, max_backoff)

if jitter:
    jitter_amount = backoff * random.uniform(0, 0.25)
    backoff += jitter_amount
Example progression (initial=1.0, multiplier=2.0, max=64.0, jitter=True):
  • Attempt 1: ~1.0-1.25s
  • Attempt 2: ~2.0-2.5s
  • Attempt 3: ~4.0-5.0s
  • Attempt 4: ~8.0-10.0s
  • Attempt 5: ~16.0-20.0s
  • Attempt 6: ~32.0-40.0s
  • Attempt 7+: ~64.0-80.0s (capped at max_backoff)

Custom Stream Processing

from xdk.streaming import StreamConfig
import json

class StreamProcessor:
    def __init__(self, client):
        self.client = client
        self.processed_count = 0
        self.error_count = 0
    
    def on_connect(self):
        print(f"Stream connected at {time.time()}")
    
    def on_disconnect(self, exc):
        print(f"Disconnected after {self.processed_count} items")
    
    def on_error(self, error):
        self.error_count += 1
        if error.is_retryable:
            print(f"Retryable error #{self.error_count}: {error.message}")
        else:
            print(f"Fatal error: {error.message}")
    
    def process_stream(self):
        config = StreamConfig(
            max_retries=-1,
            on_connect=self.on_connect,
            on_disconnect=self.on_disconnect,
            on_error=self.on_error,
            timeout=90.0  # 90 second timeout
        )
        
        try:
            for event in self.client.stream.sample_stream(config=config):
                self.process_event(event)
                self.processed_count += 1
                
                # Periodic status
                if self.processed_count % 1000 == 0:
                    print(f"Processed {self.processed_count} events")
        
        except KeyboardInterrupt:
            print(f"\nStopped after {self.processed_count} events")
        except StreamError as e:
            print(f"Stream failed: {e}")
    
    def process_event(self, event):
        # Your processing logic
        pass

# Usage
processor = StreamProcessor(client)
processor.process_stream()

Best Practices

Configure Appropriate Retries

Set max_retries=-1 for long-running streams, or use finite retries for batch processing.

Use Lifecycle Callbacks

Implement callbacks for monitoring, logging, and alerting on stream health.

Handle Non-Retryable Errors

Catch authentication and client errors separately - they require different handling than connection issues.

Set Reasonable Timeouts

Configure timeouts based on your use case - longer for stable connections, shorter for quick failure detection.

Monitor Error Patterns

Track error types and frequencies to identify infrastructure issues or API problems.

Build docs developers (and LLMs) love