The streaming module provides utilities for consuming streaming endpoints with automatic reconnection, exponential backoff, and comprehensive error handling.
StreamConfig
Configuration dataclass for streaming connections with retry behavior.
@dataclass
class StreamConfig:
max_retries: int = 10
initial_backoff: float = 1.0
max_backoff: float = 64.0
backoff_multiplier: float = 2.0
jitter: bool = True
timeout: Optional[float] = None
chunk_size: int = 1024
on_connect: Optional[Callable[[], None]] = None
on_disconnect: Optional[Callable[[Optional[Exception]], None]] = None
on_reconnect: Optional[Callable[[int, float], None]] = None
on_error: Optional[Callable[[StreamError], None]] = None
Attributes
Maximum number of reconnection attempts. Set to -1 for infinite retries.
Initial backoff delay in seconds
Maximum backoff delay in seconds
Multiplier for exponential backoff
Whether to add random jitter to backoff times
Request timeout in seconds. None for no timeout.
Size of chunks to read from stream
Optional callback when connection is established
Optional callback when connection is lost
Optional callback when reconnection is attempted. Receives (attempt: int, delay: float)
Optional callback when an error occurs
Example Usage
from xdk.streaming import StreamConfig
def on_connect():
print("Connected to stream")
def on_disconnect(error):
print(f"Disconnected: {error}")
def on_reconnect(attempt, delay):
print(f"Reconnecting (attempt {attempt}) in {delay:.2f}s")
def on_error(error):
print(f"Error: {error.error_type} - {error}")
config = StreamConfig(
max_retries=20,
initial_backoff=2.0,
max_backoff=120.0,
on_connect=on_connect,
on_disconnect=on_disconnect,
on_reconnect=on_reconnect,
on_error=on_error
)
StreamState
Internal state dataclass for tracking streaming connection status.
@dataclass
class StreamState:
retry_count: int = 0
current_backoff: float = 1.0
is_connected: bool = False
total_items_received: int = 0
last_error: Optional[StreamError] = None
Attributes
Number of retry attempts made
Current backoff delay in seconds
Whether currently connected to the stream
Total number of items received from the stream
last_error
StreamError
default:"None"
Last error encountered, if any
StreamError
Exception class for streaming errors with classification and retry logic.
class StreamError(Exception):
def __init__(
self,
message: str,
error_type: StreamErrorType,
original_exception: Optional[Exception] = None,
status_code: Optional[int] = None,
response_body: Optional[str] = None,
)
Attributes
Error message describing what went wrong
Classification of the error for retry decisions
The original exception that was wrapped, if any
HTTP status code if applicable
Response body from the server if available
Properties
is_retryable
Check if this error type should be retried.
@property
def is_retryable(self) -> bool
Returns: True if the error is retryable, False otherwise
Example Usage
from xdk.streaming import StreamError, StreamErrorType
try:
# streaming code
pass
except StreamError as e:
print(f"Error type: {e.error_type}")
print(f"Is retryable: {e.is_retryable}")
print(f"Status code: {e.status_code}")
if e.original_exception:
print(f"Original: {e.original_exception}")
StreamErrorType
Enumeration classifying streaming errors for retry decisions.
class StreamErrorType(Enum):
# Retryable errors - connection issues that may resolve
CONNECTION_ERROR = "connection_error"
TIMEOUT = "timeout"
SERVER_ERROR = "server_error" # 5xx errors
RATE_LIMITED = "rate_limited" # 429 errors
STREAM_INTERRUPTED = "stream_interrupted"
# Non-retryable errors - client issues that won't resolve with retry
AUTHENTICATION_ERROR = "authentication_error" # 401/403
CLIENT_ERROR = "client_error" # Other 4xx errors
FATAL_ERROR = "fatal_error"
Retryable Error Types
Connection issues (network errors, connection reset, etc.)
Server errors (5xx HTTP status codes)
Rate limit errors (429 HTTP status code)
Stream was interrupted mid-transfer
Non-Retryable Error Types
Authentication failures (401/403 HTTP status codes)
Other client errors (4xx HTTP status codes)
Fatal errors that cannot be recovered
stream_with_retry()
Stream data from an endpoint with automatic reconnection and exponential backoff.
def stream_with_retry(
session: requests.Session,
method: str,
url: str,
config: Optional[StreamConfig] = None,
params: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
json_data: Optional[Dict[str, Any]] = None,
response_model: Optional[Type[T]] = None,
) -> Generator[Union[T, Dict[str, Any]], None, None]
The requests Session to use for HTTP calls
HTTP method (typically “get”)
The full URL to stream from
config
StreamConfig
default:"None"
StreamConfig with retry and callback settings
params
Dict[str, Any]
default:"None"
Query parameters for the request
HTTP headers for the request
json_data
Dict[str, Any]
default:"None"
JSON body data for the request
Optional Pydantic model class to validate responses
Yields: Parsed JSON objects from the stream, optionally validated as Pydantic models
Raises:
StreamError - When a non-retryable error occurs or max retries exceeded
Example Usage
import requests
from xdk.streaming import stream_with_retry, StreamConfig
session = requests.Session()
session.headers["Authorization"] = "Bearer your_token"
config = StreamConfig(
max_retries=20,
initial_backoff=2.0,
on_connect=lambda: print("Connected")
)
for item in stream_with_retry(
session,
"get",
"https://api.x.com/2/tweets/sample/stream",
config=config
):
print(item)