Skip to main content

Overview

EdgeTransport is the core abstraction for connecting agents to real-time communication sessions. It provides a vendor-agnostic interface for WebRTC-based calls, allowing you to integrate with different providers (like GetStream) while maintaining consistent agent behavior.
from vision_agents.core.edge import EdgeTransport

Implementation

EdgeTransport is an abstract base class. Vision Agents provides the StreamEdge implementation for GetStream.io:
from vision_agents.plugins.getstream import StreamEdge

edge = StreamEdge()

Properties

events
EventManager
Event manager for subscribing to transport events. All EdgeTransport implementations automatically register these required events:
  • AudioReceivedEvent: Audio data received from participants
  • TrackAddedEvent: Media track added to the call
  • TrackRemovedEvent: Media track removed from the call
  • CallEndedEvent: Call session ended

Methods

authenticate()

Authenticate an agent user with the transport.
from vision_agents.core.edge.types import User

await edge.authenticate(User(
    id="agent-123",
    name="Support Agent",
    image="https://example.com/avatar.png"
))
user
User
required
User object containing:
  • id: Unique identifier for the agent
  • name: Display name for the agent
  • image: Optional avatar URL

create_call()

Create a new call or retrieve an existing one.
call = await edge.create_call(
    call_id="support-session-456",
    call_type="default"  # GetStream specific
)
call_id
str
required
Unique identifier for the call session
**kwargs
dict
Transport-specific configuration options. For StreamEdge:
  • call_type (str): GetStream call type (default: “default”)
Returns: Call object implementing the Call protocol

join()

Join a call and establish a connection.
connection = await edge.join(agent, call)
await connection.wait_for_participant(timeout=30.0)
agent
Agent
required
The Agent instance joining the call
call
Call
required
Call object from create_call()
**kwargs
dict
Transport-specific join options
Returns: Connection object for managing the session lifecycle

create_audio_track()

Create an audio stream track for sending audio to the call.
audio_track = edge.create_audio_track()
Returns: AudioStreamTrack for streaming PCM audio data

publish_tracks()

Publish audio and/or video tracks to the active call.
await edge.publish_tracks(
    audio_track=audio_track,
    video_track=None
)
audio_track
MediaStreamTrack | None
Audio track to publish (from create_audio_track())
video_track
MediaStreamTrack | None
Video track to publish (optional)

send_custom_event()

Send a custom event to all participants in the call.
await edge.send_custom_event({
    "type": "agent_status",
    "status": "thinking"
})
data
dict
required
JSON-serializable event payload (max 5KB for GetStream)

close()

Close the transport and clean up all resources.
await edge.close()

Event Subscription

Subscribe to transport events using the events manager:
from vision_agents.core.edge.events import AudioReceivedEvent, TrackAddedEvent

@edge.events.subscribe
async def on_audio(event: AudioReceivedEvent):
    print(f"Received audio from {event.participant.user_id}")
    # Process PCM data: event.pcm_data

@edge.events.subscribe
async def on_track_added(event: TrackAddedEvent):
    if event.track_type == TrackType.VIDEO:
        print(f"Video track added: {event.track_id}")

StreamEdge Implementation

The StreamEdge class implements EdgeTransport using GetStream.io’s infrastructure:
from vision_agents.plugins.getstream import StreamEdge
from vision_agents.core.edge.types import User

# Initialize
edge = StreamEdge()

# Authenticate
await edge.authenticate(User(id="agent-1", name="AI Agent"))

# Create and join call
call = await edge.create_call("my-call-id")
connection = await edge.join(agent, call)

# Publish audio
audio_track = edge.create_audio_track()
await edge.publish_tracks(audio_track=audio_track, video_track=None)

# Wait for participants
await connection.wait_for_participant(timeout=60.0)

StreamEdge-Specific Methods

open_demo()
async method
Open a browser demo interface for testing:
url = await edge.open_demo(call)
print(f"Demo URL: {url}")
create_conversation()
async method
Create a chat conversation channel linked to the call:
conversation = await edge.create_conversation(
    call=call,
    user=user,
    instructions="You are a helpful assistant"
)

See Also

Build docs developers (and LLMs) love