Skip to main content

AudioStreamer

Synchronous audio streamer that stores audio chunks in queues for real-time streaming during generation. Allows consuming generated audio as it’s produced rather than waiting for complete generation.

Class Signature

class AudioStreamer(BaseStreamer):
    def __init__(
        self,
        batch_size: int,
        stop_signal: Optional[any] = None,
        timeout: Optional[float] = None
    )

Initialization

batch_size
int
required
The batch size for generation. Currently VibeVoice only supports batch_size=1
stop_signal
any
default:"None"
The signal to put in queue when generation ends. Defaults to None
timeout
float
Timeout for queue operations in seconds. If None, queues block indefinitely

Attributes

audio_queues
List[Queue]
List of Queue objects, one for each sample in the batch
finished_flags
List[bool]
Flags indicating which samples have finished generation
sample_indices_map
dict
Maps sample indices to queue indices

Methods

put

Receives audio chunks and places them in appropriate queues.
streamer.put(audio_chunks=audio_tensor, sample_indices=indices)
audio_chunks
torch.Tensor
required
Tensor of shape (num_samples, ...) containing audio chunks
sample_indices
torch.Tensor
required
Tensor indicating which samples these chunks belong to

end

Signals the end of generation for specified samples or all samples.
# End all samples
streamer.end()

# End specific samples
streamer.end(sample_indices=torch.tensor([0]))
sample_indices
torch.Tensor
Optional tensor of sample indices to end. If None, ends all samples

get_stream

Get an iterator for a specific sample’s audio stream.
stream = streamer.get_stream(sample_idx=0)
for audio_chunk in stream:
    # Process audio chunk
    play_audio(audio_chunk)
sample_idx
int
required
Index of the sample to get stream for (0 to batch_size-1)
AudioSampleIterator
iterator
Iterator that yields audio chunks for the specified sample

iter

Returns an iterator over all audio streams in the batch.
for batch_chunks in streamer:
    # batch_chunks is a dict: {sample_idx: audio_chunk}
    for idx, chunk in batch_chunks.items():
        process_chunk(idx, chunk)
AudioBatchIterator
iterator
Iterator that yields dictionaries mapping sample indices to audio chunks

Usage Example

import torch
from vibevoice import (
    VibeVoiceStreamingForConditionalGenerationInference,
    VibeVoiceStreamingProcessor
)
from vibevoice.modular.streamer import AudioStreamer

# Create streamer
streamer = AudioStreamer(batch_size=1)

# Load model and processor
model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
    "microsoft/VibeVoice-Realtime-0.5B",
    torch_dtype=torch.bfloat16,
    device_map="cuda"
)
processor = VibeVoiceStreamingProcessor.from_pretrained(
    "microsoft/VibeVoice-Realtime-0.5B"
)

# Process inputs
inputs = processor.process_input_with_cached_prompt(
    text="Hello world",
    cached_prompt=voice_prompt,
    return_tensors="pt"
)

# Start generation in separate thread
import threading

def generate():
    outputs = model.generate(
        **inputs,
        audio_streamer=streamer,
        tokenizer=processor.tokenizer,
        all_prefilled_outputs=voice_prompt
    )

gen_thread = threading.Thread(target=generate)
gen_thread.start()

# Stream audio as it's generated
for audio_chunk in streamer.get_stream(0):
    # Play or process audio chunk in real-time
    print(f"Received chunk of shape: {audio_chunk.shape}")
    # play_audio(audio_chunk)

gen_thread.join()

Real-Time Playback Example

import sounddevice as sd
import numpy as np
from vibevoice.modular.streamer import AudioStreamer

streamer = AudioStreamer(batch_size=1)

# Start generation in background
gen_thread = threading.Thread(target=lambda: model.generate(
    **inputs,
    audio_streamer=streamer,
    tokenizer=processor.tokenizer,
    all_prefilled_outputs=voice_prompt
))
gen_thread.start()

# Play audio in real-time
for audio_chunk in streamer.get_stream(0):
    # Convert to numpy and play
    audio_np = audio_chunk.cpu().numpy()
    sd.play(audio_np, samplerate=24000)
    sd.wait()  # Wait for chunk to finish playing

gen_thread.join()

AsyncAudioStreamer

Asynchronous version of AudioStreamer for use in async contexts with asyncio.

Class Signature

class AsyncAudioStreamer(AudioStreamer):
    def __init__(
        self,
        batch_size: int,
        stop_signal: Optional[any] = None,
        timeout: Optional[float] = None
    )

Initialization

Same parameters as AudioStreamer. Must be created within a running event loop.
batch_size
int
required
The batch size for generation
stop_signal
any
default:"None"
Signal to indicate end of generation
timeout
float
Timeout for async queue operations

Attributes

audio_queues
List[asyncio.Queue]
List of asyncio Queue objects for async operation
loop
asyncio.AbstractEventLoop
Reference to the running event loop

Methods

put

Puts audio chunks in async queues (thread-safe).
streamer.put(audio_chunks=audio_tensor, sample_indices=indices)
Same signature as AudioStreamer.put() but uses call_soon_threadsafe for async safety.

end

Signals end of generation (thread-safe).
streamer.end(sample_indices=None)

get_stream

Async generator for a specific sample’s audio stream.
async for audio_chunk in streamer.get_stream(sample_idx=0):
    # Process audio chunk
    await process_audio(audio_chunk)
sample_idx
int
required
Index of sample to stream
async generator
AsyncGenerator
Async generator yielding audio chunks

aiter

Returns an async iterator over all audio streams.
async for batch_chunks in streamer:
    # batch_chunks is dict: {sample_idx: audio_chunk}
    for idx, chunk in batch_chunks.items():
        await process_chunk(idx, chunk)
AsyncAudioBatchIterator
async iterator
Async iterator yielding batch chunk dictionaries

Async Usage Example

import asyncio
import torch
from vibevoice import (
    VibeVoiceStreamingForConditionalGenerationInference,
    VibeVoiceStreamingProcessor
)
from vibevoice.modular.streamer import AsyncAudioStreamer

async def generate_and_stream():
    # Create async streamer (must be in running loop)
    streamer = AsyncAudioStreamer(batch_size=1)
    
    # Load model and processor
    model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
        "microsoft/VibeVoice-Realtime-0.5B",
        torch_dtype=torch.bfloat16,
        device_map="cuda"
    )
    processor = VibeVoiceStreamingProcessor.from_pretrained(
        "microsoft/VibeVoice-Realtime-0.5B"
    )
    
    # Process inputs
    inputs = processor.process_input_with_cached_prompt(
        text="Hello from async world",
        cached_prompt=voice_prompt,
        return_tensors="pt"
    )
    
    # Start generation in executor (runs in thread pool)
    loop = asyncio.get_running_loop()
    gen_task = loop.run_in_executor(
        None,
        lambda: model.generate(
            **inputs,
            audio_streamer=streamer,
            tokenizer=processor.tokenizer,
            all_prefilled_outputs=voice_prompt
        )
    )
    
    # Stream audio asynchronously
    async for audio_chunk in streamer.get_stream(0):
        print(f"Received async chunk: {audio_chunk.shape}")
        # Await async audio processing
        # await async_play_audio(audio_chunk)
    
    # Wait for generation to complete
    outputs = await gen_task
    return outputs

# Run async function
outputs = asyncio.run(generate_and_stream())

WebSocket Streaming Example

import asyncio
import websockets
import torch
from vibevoice.modular.streamer import AsyncAudioStreamer

async def stream_to_websocket(websocket, path):
    # Receive text from client
    text = await websocket.recv()
    
    # Create async streamer
    streamer = AsyncAudioStreamer(batch_size=1)
    
    # Process input
    inputs = processor.process_input_with_cached_prompt(
        text=text,
        cached_prompt=voice_prompt,
        return_tensors="pt"
    )
    
    # Start generation
    loop = asyncio.get_running_loop()
    gen_task = loop.run_in_executor(
        None,
        lambda: model.generate(
            **inputs,
            audio_streamer=streamer,
            tokenizer=processor.tokenizer,
            all_prefilled_outputs=voice_prompt
        )
    )
    
    # Stream audio chunks to WebSocket client
    async for audio_chunk in streamer.get_stream(0):
        # Convert to bytes and send
        audio_bytes = audio_chunk.cpu().numpy().tobytes()
        await websocket.send(audio_bytes)
    
    await gen_task
    await websocket.close()

# Start WebSocket server
start_server = websockets.serve(stream_to_websocket, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

AudioSampleIterator

Iterator for a single audio stream from AudioStreamer.

Methods

iter

Returns self for iteration protocol.

next

Gets next audio chunk from queue, raises StopIteration when stream ends.
iterator = streamer.get_stream(0)
try:
    while True:
        chunk = next(iterator)
        process_chunk(chunk)
except StopIteration:
    print("Stream ended")

AudioBatchIterator

Iterator that yields audio chunks for all samples in a batch.

Behavior

  • Yields dictionaries mapping sample indices to audio chunks
  • Removes samples from active set when they finish
  • Uses non-blocking queue gets with retry logic
  • Stops iteration when all samples have finished

AsyncAudioBatchIterator

Async iterator for batch audio streaming.

Behavior

  • Uses asyncio.wait() with FIRST_COMPLETED strategy
  • Cancels pending tasks when at least one chunk is ready
  • Handles async queue operations safely
  • Stops when all samples complete

Notes

  • Thread Safety: AsyncAudioStreamer uses call_soon_threadsafe to safely add items from generation thread to async queues
  • Batch Size: Currently VibeVoice only supports batch_size=1
  • Sample Rate: Audio chunks are at 24kHz sample rate
  • Chunk Size: Varies based on generation window (typically ~6 speech tokens per chunk)
  • Real-Time Factor: With 5 diffusion steps, VibeVoice can achieve < 1.0 RTF on modern GPUs

Integration with Model

The streamer is designed to integrate seamlessly with model generation:
outputs = model.generate(
    **inputs,
    audio_streamer=streamer,  # Pass streamer here
    tokenizer=processor.tokenizer,
    all_prefilled_outputs=voice_prompt
)
During generation, the model calls:
  • streamer.put(audio_chunk, sample_indices) for each generated chunk
  • streamer.end(sample_indices) when EOS is detected or max length reached
  • streamer.end() when generation completes

Build docs developers (and LLMs) love