Overview
Connection represents an active agent connection to a real-time communication session. It manages the lifecycle of the agent’s participation, tracks participant presence, and provides control over the connection state.
from vision_agents.core.edge.types import Connection
Connection Lifecycle
- Establish - Connection created by
EdgeTransport.join()
- Wait - Use
wait_for_participant() to wait for users to join
- Active - Monitor with
idle_since() to detect when participants leave
- Close - Terminate with
close() to clean up resources
# Establish connection
connection = await edge.join(agent, call)
# Wait for participant
await connection.wait_for_participant(timeout=30.0)
# Monitor activity
if connection.idle_since() > 0:
print("All participants have left")
# Clean up
await connection.close()
Methods
wait_for_participant()
Wait for at least one participant (other than the agent) to join the call.
try:
await connection.wait_for_participant(timeout=60.0)
print("Participant joined!")
except asyncio.TimeoutError:
print("No one joined within 60 seconds")
await connection.close()
Maximum time to wait in seconds. None means wait indefinitely.
Raises: asyncio.TimeoutError if timeout is reached before a participant joins
idle_since()
Return the timestamp when all participants left (except the agent).
idle_time = connection.idle_since()
if idle_time == 0.0:
print("Connection is active")
else:
elapsed = time.time() - idle_time
print(f"Connection idle for {elapsed:.1f} seconds")
Returns: Timestamp (from time.time()) when connection became idle, or 0.0 if still active
close()
Close the connection and clean up resources.
This method:
- Disconnects from the call session
- Releases network resources
- Cleans up event subscriptions
- Stops media tracks
Connection States
A connection transitions through these states:
| State | idle_since() | Description |
|---|
| Waiting | 0.0 | No participants have joined yet |
| Active | 0.0 | One or more participants present |
| Idle | > 0.0 | All participants have left |
| Closed | N/A | Connection terminated |
StreamConnection Implementation
When using StreamEdge, you receive a StreamConnection instance:
from vision_agents.plugins.getstream import StreamEdge
edge = StreamEdge()
call = await edge.create_call("my-call")
connection = await edge.join(agent, call) # Returns StreamConnection
# Access participant state (StreamConnection-specific)
participants = connection.participants
for participant in participants:
print(f"User {participant.user_id} is in the call")
Event-Driven Participant Tracking
Connections automatically track participants by subscribing to transport events:
from vision_agents.core.edge.events import TrackAddedEvent, TrackRemovedEvent
connection = await edge.join(agent, call)
@edge.events.subscribe
async def on_track_added(event: TrackAddedEvent):
if event.participant:
print(f"Track added by {event.participant.user_id}")
print(f"Track type: {event.track_type.name}")
@edge.events.subscribe
async def on_track_removed(event: TrackRemovedEvent):
if event.participant:
print(f"Track removed from {event.participant.user_id}")
Participant and TrackType
Participant
Represents a participant in the call:
from vision_agents.core.edge.types import Participant
@dataclass
class Participant:
original: Any # Provider-specific participant object
user_id: str # User identifier (may not be unique)
id: str # Unique participant ID for this call session
Fields:
The original participant object from the connectivity provider (e.g., GetStream’s protobuf participant)
User identifier. Not necessarily unique - the same user can join multiple times.
Unique identifier for this participant instance during the call (e.g., user123__session456)
TrackType
Enumerates media track types:
from vision_agents.core.edge.types import TrackType
class TrackType(IntEnum):
UNSPECIFIED = 0
AUDIO = 1
VIDEO = 2
SCREEN_SHARE = 3
SCREEN_SHARE_AUDIO = 4
Values:
Unknown or unspecified track type
Audio track (microphone input)
Video track (camera input)
Screen share audio track (system audio)
Usage Example
import asyncio
import time
from vision_agents.core import Agent
from vision_agents.plugins.getstream import StreamEdge
from vision_agents.core.edge.types import User, TrackType
from vision_agents.core.edge.events import TrackAddedEvent
async def run_agent():
edge = StreamEdge()
# Authenticate
await edge.authenticate(User(
id="agent-support-1",
name="Support Agent"
))
# Create call and join
call = await edge.create_call("support-session-789")
connection = await edge.join(agent, call)
# Subscribe to track events
@edge.events.subscribe
async def on_track_added(event: TrackAddedEvent):
if event.track_type == TrackType.AUDIO:
print(f"Audio track: {event.track_id}")
elif event.track_type == TrackType.VIDEO:
print(f"Video track: {event.track_id}")
try:
# Wait up to 60 seconds for a participant
print("Waiting for participant...")
await connection.wait_for_participant(timeout=60.0)
print("Participant joined!")
# Monitor for inactivity
while True:
idle_time = connection.idle_since()
if idle_time > 0:
elapsed = time.time() - idle_time
if elapsed > 30.0:
print("Idle for 30s, ending call")
break
await asyncio.sleep(1.0)
except asyncio.TimeoutError:
print("No participant joined, closing")
finally:
await connection.close()
await edge.close()
asyncio.run(run_agent())
See Also
- EdgeTransport - Creating connections with
join()
- Call - Call session interface
- Events - Subscribing to connection events