AudioStreamer
Synchronous audio streamer that stores audio chunks in queues for real-time streaming during generation. Allows consuming generated audio as it’s produced rather than waiting for complete generation.
Class Signature
class AudioStreamer(BaseStreamer):
def __init__(
self,
batch_size: int,
stop_signal: Optional[any] = None,
timeout: Optional[float] = None
)
Initialization
The batch size for generation. Currently VibeVoice only supports batch_size=1
The signal to put in queue when generation ends. Defaults to None
Timeout for queue operations in seconds. If None, queues block indefinitely
Attributes
List of Queue objects, one for each sample in the batch
Flags indicating which samples have finished generation
Maps sample indices to queue indices
Methods
put
Receives audio chunks and places them in appropriate queues.
streamer.put(audio_chunks=audio_tensor, sample_indices=indices)
Tensor of shape (num_samples, ...) containing audio chunks
Tensor indicating which samples these chunks belong to
end
Signals the end of generation for specified samples or all samples.
# End all samples
streamer.end()
# End specific samples
streamer.end(sample_indices=torch.tensor([0]))
Optional tensor of sample indices to end. If None, ends all samples
get_stream
Get an iterator for a specific sample’s audio stream.
stream = streamer.get_stream(sample_idx=0)
for audio_chunk in stream:
# Process audio chunk
play_audio(audio_chunk)
Index of the sample to get stream for (0 to batch_size-1)
Iterator that yields audio chunks for the specified sample
iter
Returns an iterator over all audio streams in the batch.
for batch_chunks in streamer:
# batch_chunks is a dict: {sample_idx: audio_chunk}
for idx, chunk in batch_chunks.items():
process_chunk(idx, chunk)
Iterator that yields dictionaries mapping sample indices to audio chunks
Usage Example
import torch
from vibevoice import (
VibeVoiceStreamingForConditionalGenerationInference,
VibeVoiceStreamingProcessor
)
from vibevoice.modular.streamer import AudioStreamer
# Create streamer
streamer = AudioStreamer(batch_size=1)
# Load model and processor
model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
"microsoft/VibeVoice-Realtime-0.5B",
torch_dtype=torch.bfloat16,
device_map="cuda"
)
processor = VibeVoiceStreamingProcessor.from_pretrained(
"microsoft/VibeVoice-Realtime-0.5B"
)
# Process inputs
inputs = processor.process_input_with_cached_prompt(
text="Hello world",
cached_prompt=voice_prompt,
return_tensors="pt"
)
# Start generation in separate thread
import threading
def generate():
outputs = model.generate(
**inputs,
audio_streamer=streamer,
tokenizer=processor.tokenizer,
all_prefilled_outputs=voice_prompt
)
gen_thread = threading.Thread(target=generate)
gen_thread.start()
# Stream audio as it's generated
for audio_chunk in streamer.get_stream(0):
# Play or process audio chunk in real-time
print(f"Received chunk of shape: {audio_chunk.shape}")
# play_audio(audio_chunk)
gen_thread.join()
Real-Time Playback Example
import sounddevice as sd
import numpy as np
from vibevoice.modular.streamer import AudioStreamer
streamer = AudioStreamer(batch_size=1)
# Start generation in background
gen_thread = threading.Thread(target=lambda: model.generate(
**inputs,
audio_streamer=streamer,
tokenizer=processor.tokenizer,
all_prefilled_outputs=voice_prompt
))
gen_thread.start()
# Play audio in real-time
for audio_chunk in streamer.get_stream(0):
# Convert to numpy and play
audio_np = audio_chunk.cpu().numpy()
sd.play(audio_np, samplerate=24000)
sd.wait() # Wait for chunk to finish playing
gen_thread.join()
AsyncAudioStreamer
Asynchronous version of AudioStreamer for use in async contexts with asyncio.
Class Signature
class AsyncAudioStreamer(AudioStreamer):
def __init__(
self,
batch_size: int,
stop_signal: Optional[any] = None,
timeout: Optional[float] = None
)
Initialization
Same parameters as AudioStreamer. Must be created within a running event loop.
The batch size for generation
Signal to indicate end of generation
Timeout for async queue operations
Attributes
List of asyncio Queue objects for async operation
loop
asyncio.AbstractEventLoop
Reference to the running event loop
Methods
put
Puts audio chunks in async queues (thread-safe).
streamer.put(audio_chunks=audio_tensor, sample_indices=indices)
Same signature as AudioStreamer.put() but uses call_soon_threadsafe for async safety.
end
Signals end of generation (thread-safe).
streamer.end(sample_indices=None)
get_stream
Async generator for a specific sample’s audio stream.
async for audio_chunk in streamer.get_stream(sample_idx=0):
# Process audio chunk
await process_audio(audio_chunk)
Index of sample to stream
Async generator yielding audio chunks
aiter
Returns an async iterator over all audio streams.
async for batch_chunks in streamer:
# batch_chunks is dict: {sample_idx: audio_chunk}
for idx, chunk in batch_chunks.items():
await process_chunk(idx, chunk)
Async iterator yielding batch chunk dictionaries
Async Usage Example
import asyncio
import torch
from vibevoice import (
VibeVoiceStreamingForConditionalGenerationInference,
VibeVoiceStreamingProcessor
)
from vibevoice.modular.streamer import AsyncAudioStreamer
async def generate_and_stream():
# Create async streamer (must be in running loop)
streamer = AsyncAudioStreamer(batch_size=1)
# Load model and processor
model = VibeVoiceStreamingForConditionalGenerationInference.from_pretrained(
"microsoft/VibeVoice-Realtime-0.5B",
torch_dtype=torch.bfloat16,
device_map="cuda"
)
processor = VibeVoiceStreamingProcessor.from_pretrained(
"microsoft/VibeVoice-Realtime-0.5B"
)
# Process inputs
inputs = processor.process_input_with_cached_prompt(
text="Hello from async world",
cached_prompt=voice_prompt,
return_tensors="pt"
)
# Start generation in executor (runs in thread pool)
loop = asyncio.get_running_loop()
gen_task = loop.run_in_executor(
None,
lambda: model.generate(
**inputs,
audio_streamer=streamer,
tokenizer=processor.tokenizer,
all_prefilled_outputs=voice_prompt
)
)
# Stream audio asynchronously
async for audio_chunk in streamer.get_stream(0):
print(f"Received async chunk: {audio_chunk.shape}")
# Await async audio processing
# await async_play_audio(audio_chunk)
# Wait for generation to complete
outputs = await gen_task
return outputs
# Run async function
outputs = asyncio.run(generate_and_stream())
WebSocket Streaming Example
import asyncio
import websockets
import torch
from vibevoice.modular.streamer import AsyncAudioStreamer
async def stream_to_websocket(websocket, path):
# Receive text from client
text = await websocket.recv()
# Create async streamer
streamer = AsyncAudioStreamer(batch_size=1)
# Process input
inputs = processor.process_input_with_cached_prompt(
text=text,
cached_prompt=voice_prompt,
return_tensors="pt"
)
# Start generation
loop = asyncio.get_running_loop()
gen_task = loop.run_in_executor(
None,
lambda: model.generate(
**inputs,
audio_streamer=streamer,
tokenizer=processor.tokenizer,
all_prefilled_outputs=voice_prompt
)
)
# Stream audio chunks to WebSocket client
async for audio_chunk in streamer.get_stream(0):
# Convert to bytes and send
audio_bytes = audio_chunk.cpu().numpy().tobytes()
await websocket.send(audio_bytes)
await gen_task
await websocket.close()
# Start WebSocket server
start_server = websockets.serve(stream_to_websocket, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
AudioSampleIterator
Iterator for a single audio stream from AudioStreamer.
Methods
iter
Returns self for iteration protocol.
next
Gets next audio chunk from queue, raises StopIteration when stream ends.
iterator = streamer.get_stream(0)
try:
while True:
chunk = next(iterator)
process_chunk(chunk)
except StopIteration:
print("Stream ended")
AudioBatchIterator
Iterator that yields audio chunks for all samples in a batch.
Behavior
- Yields dictionaries mapping sample indices to audio chunks
- Removes samples from active set when they finish
- Uses non-blocking queue gets with retry logic
- Stops iteration when all samples have finished
AsyncAudioBatchIterator
Async iterator for batch audio streaming.
Behavior
- Uses
asyncio.wait() with FIRST_COMPLETED strategy
- Cancels pending tasks when at least one chunk is ready
- Handles async queue operations safely
- Stops when all samples complete
Notes
- Thread Safety: AsyncAudioStreamer uses
call_soon_threadsafe to safely add items from generation thread to async queues
- Batch Size: Currently VibeVoice only supports batch_size=1
- Sample Rate: Audio chunks are at 24kHz sample rate
- Chunk Size: Varies based on generation window (typically ~6 speech tokens per chunk)
- Real-Time Factor: With 5 diffusion steps, VibeVoice can achieve < 1.0 RTF on modern GPUs
Integration with Model
The streamer is designed to integrate seamlessly with model generation:
outputs = model.generate(
**inputs,
audio_streamer=streamer, # Pass streamer here
tokenizer=processor.tokenizer,
all_prefilled_outputs=voice_prompt
)
During generation, the model calls:
streamer.put(audio_chunk, sample_indices) for each generated chunk
streamer.end(sample_indices) when EOS is detected or max length reached
streamer.end() when generation completes