Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

PHP Client

The official PHP client for Durable Streams protocol with PSR-18 HTTP client support.

Installation

composer require durable-streams/client
Requires PHP 8.1 or later

Quick Start

<?php
use DurableStreams\DurableStream;
use DurableStreams\LiveMode;

// Create a stream
$stream = DurableStream::create(
    'https://streams.example.com/my-stream',
    contentType: 'application/json'
);

// Append data
$stream->append(json_encode(['message' => 'hello']));

// Read data
$response = $stream->read(offset: '-1', live: LiveMode::LongPoll);
foreach ($response->iterJson() as $item) {
    echo "Item: " . json_encode($item) . "\n";
}

Core APIs

DurableStream

Handle for interacting with a durable stream.

Static Factory Methods

DurableStream::create()
static
Create a new stream
url
string
required
Stream URL
contentType
string
default:"'application/octet-stream'"
Content type
headers
array<string, string>
default:"[]"
Additional headers
ttlSeconds
int|null
Time-to-live in seconds
expiresAt
string|null
Absolute expiry (ISO 8601)
closed
bool
default:"false"
Create in closed state
data
string|null
Optional initial data
client
HttpClientInterface|null
Optional HTTP client (PSR-18)
Returns: DurableStreamThrows: StreamExistsException if stream already exists
DurableStream::connect()
static
Connect to an existing stream (validates via HEAD)
url
string
required
Stream URL
headers
array<string, string>
default:"[]"
Additional headers
client
HttpClientInterface|null
Optional HTTP client
Returns: DurableStreamThrows: StreamNotFoundException if stream doesn’t exist
Example:
$stream = DurableStream::create(
    'https://streams.example.com/events',
    contentType: 'application/json',
    ttlSeconds: 86400,  // 24 hours
    headers: ['Authorization' => 'Bearer token123']
);

Write Operations

append()
method
Append data to the stream
data
string
required
Data to append (for JSON streams, pass pre-encoded JSON string)
seq
string|null
Writer coordination sequence
extraHeaders
array<string, string>
default:"[]"
Additional headers for this request
Returns: AppendResult with offset and duplicateThrows: StreamClosedException, SeqConflictException, DurableStreamException
Example:
// JSON stream - pass pre-encoded JSON
$result = $stream->append(json_encode(['message' => 'hello']));
echo "Offset: {$result->offset}\n";

// With sequence for coordination
$result = $stream->append(
    json_encode(['message' => 'world']),
    seq: 'seq-001'
);

// Byte stream
$result = $stream->append('raw text data');
closeStream()
method
Close the stream permanently (no more appends)
data
string|null
Optional final message
contentType
string|null
Content type override
extraHeaders
array<string, string>
default:"[]"
Additional headers
Returns: CloseResult with finalOffsetThrows: StreamClosedException if called with data on already-closed stream
delete()
method
Delete this streamThrows: StreamNotFoundException if stream doesn’t exist
DurableStream::deleteStatic()
static
Delete a stream without creating a handle
url
string
required
Stream URL
headers
array<string, string>|null
Additional headers
client
HttpClientInterface|null
Optional HTTP client

Read Operations

read()
method
Read from the stream
offset
string
default:"'-1'"
Starting offset
live
LiveMode
default:"LiveMode::Off"
Live mode: LiveMode::Off, LiveMode::LongPoll, LiveMode::SSE
extraHeaders
array<string, string>
default:"[]"
Additional headers
timeout
float|null
Timeout in seconds
Returns: StreamResponse
Example:
use DurableStreams\LiveMode;

$response = $stream->read(
    offset: $lastOffset,
    live: LiveMode::LongPoll
);

// Iterate over JSON items
foreach ($response->iterJson() as $item) {
    echo "Item: " . json_encode($item) . "\n";
}

// Or get all items at once
$items = $response->jsonAll();
foreach ($items as $item) {
    processItem($item);
}

// Save checkpoint
$checkpoint = $response->offset();
saveOffset($checkpoint);
head()
method
Get stream metadata via HEAD requestReturns: HeadResult with offset, contentType, streamClosed
DurableStream::headStatic()
static
Get metadata without creating a handle
url
string
required
Stream URL
headers
array<string, string>|null
Additional headers
client
HttpClientInterface|null
Optional HTTP client
Returns: HeadResult

StreamResponse

Response object for reading stream data.
iterJson()
method
Iterate over individual JSON itemsReturns: Iterator<mixed>
iterBytes()
method
Iterate over byte chunksReturns: Iterator<string>
jsonAll()
method
Get all JSON items as arrayReturns: array<mixed>
readAll()
method
Get all bytesReturns: string
offset()
method
Get current stream offsetReturns: string
upToDate()
method
Check if we’ve reached the current end of streamReturns: bool

IdempotentProducer

Exactly-once writes with automatic batching.
use DurableStreams\IdempotentProducer;

$producer = new IdempotentProducer(
    url: 'https://streams.example.com/orders',
    producerId: 'order-service-1',
    epoch: 0,
    autoClaim: true,
    maxBatchBytes: 1024 * 1024,
    lingerMs: 5
);

// Fire-and-forget writes
$producer->append(json_encode(['orderId' => '123']));
$producer->append(json_encode(['orderId' => '456']));

// Ensure delivery
$producer->flush();
__construct()
constructor
Create an idempotent producer
url
string
required
Stream URL
producerId
string
required
Stable identifier
epoch
int
default:"0"
Starting epoch
autoClaim
bool
default:"false"
Auto-retry with epoch+1 on 403
maxBatchBytes
int
default:"1048576"
Max batch size (1MB)
lingerMs
int
default:"5"
Max wait before sending batch
contentType
string
default:"'application/octet-stream'"
Content type
client
HttpClientInterface|null
Optional HTTP client
append()
method
Fire-and-forget append (returns immediately)
data
string
required
Data to append (for JSON, pass pre-encoded JSON)
flush()
method
Send pending batch and wait for completion

Types

LiveMode

enum LiveMode: string
{
    case Off = 'off';
    case LongPoll = 'long-poll';
    case SSE = 'sse';
}

AppendResult

final class AppendResult
{
    public readonly string $offset;
    public readonly int $status;
    public readonly bool $duplicate;
}

HeadResult

final class HeadResult
{
    public readonly string $offset;
    public readonly ?string $contentType;
    public readonly bool $streamClosed;
}

CloseResult

final class CloseResult
{
    public readonly string $finalOffset;
}

Error Handling

use DurableStreams\Exception\{
    DurableStreamException,
    StreamNotFoundException,
    StreamClosedException,
    StreamExistsException,
    SeqConflictException
};

try {
    $stream->append($data);
} catch (StreamClosedException $e) {
    echo "Stream is closed: {$e->getUrl()}\n";
} catch (SeqConflictException $e) {
    echo "Sequence conflict\n";
} catch (StreamNotFoundException $e) {
    echo "Stream not found: {$e->getUrl()}\n";
} catch (DurableStreamException $e) {
    echo "Error {$e->getStatusCode()}: {$e->getMessage()}\n";
}

Exception Hierarchy

  • DurableStreamException - Base exception
    • StreamNotFoundException - 404
    • StreamExistsException - 409 on create
    • StreamClosedException - 409 with Stream-Closed header
    • SeqConflictException - 409 sequence conflict
    • StaleEpochException - 403 for producers
    • UnauthorizedException - 401
    • RateLimitedException - 429
    • MessageTooLargeException - 413

Advanced Features

Custom HTTP Client (PSR-18)

use GuzzleHttp\Client;
use DurableStreams\Internal\Psr18HttpClient;

$guzzle = new Client([
    'timeout' => 30,
    'connect_timeout' => 10,
]);

$httpClient = new Psr18HttpClient($guzzle);

$stream = new DurableStream(
    url: 'https://streams.example.com/events',
    client: $httpClient
);

Streaming Large Reads

$response = $stream->read(live: LiveMode::LongPoll);

foreach ($response->iterJson() as $item) {
    // Process one item at a time (memory efficient)
    processItem($item);

    // Save checkpoint periodically
    if (shouldCheckpoint()) {
        saveOffset($response->offset());
    }
}

JSON Batching

// For JSON streams, DurableStream automatically wraps data in arrays
// No need to wrap manually

$stream = DurableStream::create(
    'https://streams.example.com/events',
    contentType: 'application/json'
);

// This is automatically wrapped as [{"event":"test"}] by the client
$stream->append(json_encode(['event' => 'test']));

Helper Functions

The library provides global helper functions:
use function DurableStreams\stream;

// Quick stream creation
$response = stream([
    'url' => 'https://streams.example.com/events',
    'offset' => '-1',
    'live' => 'long-poll',
]);

foreach ($response->iterJson() as $item) {
    echo json_encode($item) . "\n";
}

Source Code

Source: packages/client-php/src/

Build docs developers (and LLMs) love