Skip to main content
The streaming module provides utilities for consuming streaming endpoints with automatic reconnection, exponential backoff, and comprehensive error handling.

StreamConfig

Configuration dataclass for streaming connections with retry behavior.
@dataclass
class StreamConfig:
    max_retries: int = 10
    initial_backoff: float = 1.0
    max_backoff: float = 64.0
    backoff_multiplier: float = 2.0
    jitter: bool = True
    timeout: Optional[float] = None
    chunk_size: int = 1024
    on_connect: Optional[Callable[[], None]] = None
    on_disconnect: Optional[Callable[[Optional[Exception]], None]] = None
    on_reconnect: Optional[Callable[[int, float], None]] = None
    on_error: Optional[Callable[[StreamError], None]] = None

Attributes

max_retries
int
default:"10"
Maximum number of reconnection attempts. Set to -1 for infinite retries.
initial_backoff
float
default:"1.0"
Initial backoff delay in seconds
max_backoff
float
default:"64.0"
Maximum backoff delay in seconds
backoff_multiplier
float
default:"2.0"
Multiplier for exponential backoff
jitter
bool
default:"True"
Whether to add random jitter to backoff times
timeout
float
default:"None"
Request timeout in seconds. None for no timeout.
chunk_size
int
default:"1024"
Size of chunks to read from stream
on_connect
Callable
default:"None"
Optional callback when connection is established
on_disconnect
Callable
default:"None"
Optional callback when connection is lost
on_reconnect
Callable
default:"None"
Optional callback when reconnection is attempted. Receives (attempt: int, delay: float)
on_error
Callable
default:"None"
Optional callback when an error occurs

Example Usage

from xdk.streaming import StreamConfig

def on_connect():
    print("Connected to stream")

def on_disconnect(error):
    print(f"Disconnected: {error}")

def on_reconnect(attempt, delay):
    print(f"Reconnecting (attempt {attempt}) in {delay:.2f}s")

def on_error(error):
    print(f"Error: {error.error_type} - {error}")

config = StreamConfig(
    max_retries=20,
    initial_backoff=2.0,
    max_backoff=120.0,
    on_connect=on_connect,
    on_disconnect=on_disconnect,
    on_reconnect=on_reconnect,
    on_error=on_error
)

StreamState

Internal state dataclass for tracking streaming connection status.
@dataclass
class StreamState:
    retry_count: int = 0
    current_backoff: float = 1.0
    is_connected: bool = False
    total_items_received: int = 0
    last_error: Optional[StreamError] = None

Attributes

retry_count
int
default:"0"
Number of retry attempts made
current_backoff
float
default:"1.0"
Current backoff delay in seconds
is_connected
bool
default:"False"
Whether currently connected to the stream
total_items_received
int
default:"0"
Total number of items received from the stream
last_error
StreamError
default:"None"
Last error encountered, if any

StreamError

Exception class for streaming errors with classification and retry logic.
class StreamError(Exception):
    def __init__(
        self,
        message: str,
        error_type: StreamErrorType,
        original_exception: Optional[Exception] = None,
        status_code: Optional[int] = None,
        response_body: Optional[str] = None,
    )

Attributes

message
str
required
Error message describing what went wrong
error_type
StreamErrorType
required
Classification of the error for retry decisions
original_exception
Exception
default:"None"
The original exception that was wrapped, if any
status_code
int
default:"None"
HTTP status code if applicable
response_body
str
default:"None"
Response body from the server if available

Properties

is_retryable

Check if this error type should be retried.
@property
def is_retryable(self) -> bool
Returns: True if the error is retryable, False otherwise

Example Usage

from xdk.streaming import StreamError, StreamErrorType

try:
    # streaming code
    pass
except StreamError as e:
    print(f"Error type: {e.error_type}")
    print(f"Is retryable: {e.is_retryable}")
    print(f"Status code: {e.status_code}")
    if e.original_exception:
        print(f"Original: {e.original_exception}")

StreamErrorType

Enumeration classifying streaming errors for retry decisions.
class StreamErrorType(Enum):
    # Retryable errors - connection issues that may resolve
    CONNECTION_ERROR = "connection_error"
    TIMEOUT = "timeout"
    SERVER_ERROR = "server_error"  # 5xx errors
    RATE_LIMITED = "rate_limited"  # 429 errors
    STREAM_INTERRUPTED = "stream_interrupted"

    # Non-retryable errors - client issues that won't resolve with retry
    AUTHENTICATION_ERROR = "authentication_error"  # 401/403
    CLIENT_ERROR = "client_error"  # Other 4xx errors
    FATAL_ERROR = "fatal_error"

Retryable Error Types

CONNECTION_ERROR
str
Connection issues (network errors, connection reset, etc.)
TIMEOUT
str
Request or read timeout
SERVER_ERROR
str
Server errors (5xx HTTP status codes)
RATE_LIMITED
str
Rate limit errors (429 HTTP status code)
STREAM_INTERRUPTED
str
Stream was interrupted mid-transfer

Non-Retryable Error Types

AUTHENTICATION_ERROR
str
Authentication failures (401/403 HTTP status codes)
CLIENT_ERROR
str
Other client errors (4xx HTTP status codes)
FATAL_ERROR
str
Fatal errors that cannot be recovered

stream_with_retry()

Stream data from an endpoint with automatic reconnection and exponential backoff.
def stream_with_retry(
    session: requests.Session,
    method: str,
    url: str,
    config: Optional[StreamConfig] = None,
    params: Optional[Dict[str, Any]] = None,
    headers: Optional[Dict[str, str]] = None,
    json_data: Optional[Dict[str, Any]] = None,
    response_model: Optional[Type[T]] = None,
) -> Generator[Union[T, Dict[str, Any]], None, None]
session
requests.Session
required
The requests Session to use for HTTP calls
method
str
required
HTTP method (typically “get”)
url
str
required
The full URL to stream from
config
StreamConfig
default:"None"
StreamConfig with retry and callback settings
params
Dict[str, Any]
default:"None"
Query parameters for the request
headers
Dict[str, str]
default:"None"
HTTP headers for the request
json_data
Dict[str, Any]
default:"None"
JSON body data for the request
response_model
Type[T]
default:"None"
Optional Pydantic model class to validate responses
Yields: Parsed JSON objects from the stream, optionally validated as Pydantic models Raises:
  • StreamError - When a non-retryable error occurs or max retries exceeded

Example Usage

import requests
from xdk.streaming import stream_with_retry, StreamConfig

session = requests.Session()
session.headers["Authorization"] = "Bearer your_token"

config = StreamConfig(
    max_retries=20,
    initial_backoff=2.0,
    on_connect=lambda: print("Connected")
)

for item in stream_with_retry(
    session,
    "get",
    "https://api.x.com/2/tweets/sample/stream",
    config=config
):
    print(item)

Build docs developers (and LLMs) love