Documentation Index
Fetch the complete documentation index at: https://mintlify.com/moqtail/moqtail/llms.txt
Use this file to discover all available pages before exploring further.
Relay
The relay module provides server-side functionality for building MOQT relay servers that forward media streams between publishers and subscribers. A relay acts as an intermediary that accepts published tracks and redistributes them to subscribed clients.The relay module structure is defined in Moqtail but implementation is primarily through the transport and model layers.
Relay Architecture
A MOQT relay server follows this architecture:┌─────────────────────────────────────────────┐
│ Relay Application Logic │
│ (Accept connections, route streams) │
└──────────┬────────────────┬─────────────────┘
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ Publisher │ │ Subscriber │
│ Connection │ │ Connection │
└──────┬──────┘ └──────┬──────┘
│ │
┌──────▼────────────────▼──────┐
│ ControlStreamHandler │
│ (Setup, Publish, Subscribe) │
└──────┬──────────────┬─────────┘
│ │
┌──────▼──────┐ ┌─────▼─────────┐
│ Receive │ │ Forward │
│ Media │ │ Media │
└─────────────┘ └───────────────┘
Server Setup
Creating a Server Endpoint
Set up a WebTransport/QUIC server:use wtransport::{ServerConfig, Endpoint, Identity};
use std::net::SocketAddr;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create server identity (certificate)
let server_identity = Identity::self_signed(std::iter::once("localhost"))?;
// Create server configuration
let server_config = ServerConfig::builder()
.with_bind_address("0.0.0.0:4433".parse::<SocketAddr>()?)
.with_identity(server_identity)
.build();
// Create server endpoint
let server_endpoint = Endpoint::server(server_config)?;
println!("MOQT relay listening on {}", server_endpoint.local_addr()?);
// Accept connections
accept_connections(server_endpoint).await?;
Ok(())
}
Accepting Connections
Handle incoming connections in a loop:use wtransport::Endpoint;
async fn accept_connections(endpoint: Endpoint) -> Result<(), Box<dyn std::error::Error>> {
loop {
// Accept incoming connection
let incoming = endpoint.accept().await;
// Spawn task to handle connection
tokio::spawn(async move {
// Await session request
let session_request = match incoming.await {
Ok(req) => req,
Err(e) => {
eprintln!("Failed to receive session request: {:?}", e);
return;
}
};
// Accept the session
let connection = match session_request.accept().await {
Ok(conn) => conn,
Err(e) => {
eprintln!("Failed to accept session: {:?}", e);
return;
}
};
println!("New connection from: {:?}", connection.remote_address());
// Handle the connection
if let Err(e) = handle_connection(connection).await {
eprintln!("Connection error: {:?}", e);
}
});
}
}
Connection Handling
Server Setup Handshake
Perform the MOQT setup handshake with clients:use moqtail::transport::control_stream_handler::ControlStreamHandler;
use moqtail::model::control::client_setup::ClientSetup;
use moqtail::model::control::server_setup::ServerSetup;
use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::constant::DRAFT_14;
use moqtail::model::common::pair::KeyValuePair;
use moqtail::model::error::TerminationCode;
use wtransport::Connection;
use bytes::Bytes;
async fn handle_connection(connection: Connection) -> Result<(), Box<dyn std::error::Error>> {
// Accept control stream
let (send_stream, recv_stream) = connection.accept_bi().await?;
// Set control stream to max priority
send_stream.set_priority(i32::MAX);
let mut control_handler = ControlStreamHandler::new(send_stream, recv_stream);
// Receive client setup
match control_handler.next_message().await {
Ok(ControlMessage::ClientSetup(client_setup)) => {
println!("Client supports versions: {:?}", client_setup.supported_versions);
// Check if we support any of the client's versions
if !client_setup.supported_versions.contains(&DRAFT_14) {
return Err("No compatible version".into());
}
// Send server setup response
let server_setup = ServerSetup {
selected_version: DRAFT_14,
setup_parameters: vec![
KeyValuePair::try_new_bytes(
1,
Bytes::from_static(b"relay-server")
).unwrap(),
],
};
control_handler
.send(&ControlMessage::ServerSetup(Box::new(server_setup)))
.await?;
println!("Setup handshake complete");
}
Ok(_) => return Err("Expected ClientSetup message".into()),
Err(e) => return Err(format!("Setup error: {:?}", e).into()),
}
// Handle client messages
handle_client_messages(control_handler, connection).await
}
Processing Control Messages
Handle different types of control messages:use moqtail::model::control::publish::Publish;
use moqtail::model::control::publish_namespace::PublishNamespace;
use moqtail::model::control::subscribe::Subscribe;
async fn handle_client_messages(
mut control: ControlStreamHandler,
connection: Connection,
) -> Result<(), Box<dyn std::error::Error>> {
loop {
match control.next_message().await {
Ok(ControlMessage::PublishNamespace(publish_ns)) => {
handle_publish_namespace(&mut control, publish_ns).await?;
}
Ok(ControlMessage::Publish(publish)) => {
handle_publish(&mut control, publish).await?;
}
Ok(ControlMessage::Subscribe(subscribe)) => {
handle_subscribe(&mut control, subscribe).await?;
}
Ok(ControlMessage::Unsubscribe(unsub)) => {
handle_unsubscribe(&mut control, unsub).await?;
}
Ok(msg) => {
println!("Received control message: {:?}", msg.get_type());
}
Err(TerminationCode::NoError) => {
println!("Client disconnected cleanly");
break;
}
Err(e) => {
eprintln!("Control stream error: {:?}", e);
break;
}
}
}
Ok(())
}
Publishing
Handling Publish Namespace
Process namespace publication announcements:use moqtail::model::control::publish_namespace::PublishNamespace;
use moqtail::model::control::publish_namespace_ok::PublishNamespaceOk;
use moqtail::model::control::publish_namespace_error::PublishNamespaceError;
use moqtail::model::control::constant::PublishNamespaceErrorCode;
use moqtail::model::common::reason_phrase::ReasonPhrase;
async fn handle_publish_namespace(
control: &mut ControlStreamHandler,
publish_ns: Box<PublishNamespace>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Publish namespace request: {:?}", publish_ns.track_namespace);
// Validate namespace (implement your authorization logic)
let authorized = authorize_namespace(&publish_ns.track_namespace).await;
if authorized {
// Accept the publication
let response = PublishNamespaceOk {
request_id: publish_ns.request_id,
};
control
.send(&ControlMessage::PublishNamespaceOk(Box::new(response)))
.await?;
println!("Accepted namespace publication");
} else {
// Reject the publication
let response = PublishNamespaceError {
request_id: publish_ns.request_id,
error_code: PublishNamespaceErrorCode::Unauthorized,
reason_phrase: ReasonPhrase::try_new("Unauthorized namespace".to_string())?,
};
control
.send(&ControlMessage::PublishNamespaceError(Box::new(response)))
.await?;
}
Ok(())
}
async fn authorize_namespace(namespace: &Tuple) -> bool {
// Implement your authorization logic
// For example, check against allowed namespaces
true
}
Handling Track Publication
Process individual track publications:use moqtail::model::control::publish::Publish;
use moqtail::model::control::publish_ok::PublishOk;
use moqtail::model::control::publish_error::PublishError;
use moqtail::model::control::constant::PublishErrorCode;
async fn handle_publish(
control: &mut ControlStreamHandler,
publish: Box<Publish>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Publish track: {}", publish.get_full_track_name());
// Register the track for forwarding
let track_alias = register_track(publish.get_full_track_name()).await?;
// Send publish OK with track alias
let response = PublishOk {
request_id: publish.request_id,
track_alias,
expires: 3600, // 1 hour
};
control
.send(&ControlMessage::PublishOk(Box::new(response)))
.await?;
Ok(())
}
async fn register_track(
track_name: FullTrackName,
) -> Result<u64, Box<dyn std::error::Error>> {
// Implement track registration logic
// Assign a unique track alias for this connection
Ok(1)
}
Subscription Management
Handling Subscribe Requests
Process subscription requests from clients:use moqtail::model::control::subscribe::Subscribe;
use moqtail::model::control::subscribe_ok::SubscribeOk;
use moqtail::model::control::subscribe_error::SubscribeError;
use moqtail::model::control::constant::{GroupOrder, SubscribeErrorCode};
use moqtail::model::common::location::Location;
async fn handle_subscribe(
control: &mut ControlStreamHandler,
subscribe: Box<Subscribe>,
) -> Result<(), Box<dyn std::error::Error>> {
let track_name = subscribe.get_full_track_name();
println!("Subscribe request for: {}", track_name);
// Check if track exists
if let Some(track_info) = find_track(&track_name).await {
// Assign track alias for this subscription
let track_alias = assign_track_alias(&track_name).await?;
// Send subscribe OK
let response = SubscribeOk {
request_id: subscribe.request_id,
track_alias,
expires: 3600,
group_order: GroupOrder::Ascending,
content_exists: true,
largest_location: Some(Location {
group: track_info.latest_group,
object: track_info.latest_object,
}),
subscribe_parameters: None,
};
control
.send(&ControlMessage::SubscribeOk(Box::new(response)))
.await?;
// Start forwarding data to this subscriber
start_forwarding(track_alias, subscribe).await?;
} else {
// Track not found
let response = SubscribeError {
request_id: subscribe.request_id,
error_code: SubscribeErrorCode::TrackNotFound,
reason_phrase: ReasonPhrase::try_new("Track not available".to_string())?,
track_alias: 0,
};
control
.send(&ControlMessage::SubscribeError(Box::new(response)))
.await?;
}
Ok(())
}
struct TrackInfo {
latest_group: u64,
latest_object: u64,
}
async fn find_track(track_name: &FullTrackName) -> Option<TrackInfo> {
// Implement track lookup logic
Some(TrackInfo {
latest_group: 42,
latest_object: 10,
})
}
async fn assign_track_alias(
track_name: &FullTrackName,
) -> Result<u64, Box<dyn std::error::Error>> {
// Assign unique track alias
Ok(1)
}
async fn start_forwarding(
track_alias: u64,
subscribe: Box<Subscribe>,
) -> Result<(), Box<dyn std::error::Error>> {
// Implement data forwarding logic
Ok(())
}
Handling Unsubscribe
Clean up subscriptions:use moqtail::model::control::unsubscribe::Unsubscribe;
async fn handle_unsubscribe(
control: &mut ControlStreamHandler,
unsub: Box<Unsubscribe>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Unsubscribe from track alias: {}", unsub.track_alias);
// Stop forwarding to this subscriber
stop_forwarding(unsub.track_alias).await?;
Ok(())
}
async fn stop_forwarding(track_alias: u64) -> Result<(), Box<dyn std::error::Error>> {
// Implement forwarding cleanup
Ok(())
}
State Management
Track Registry
Maintain a registry of active tracks:use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use moqtail::model::data::full_track_name::FullTrackName;
use moqtail::model::data::track::Track;
struct RelayState {
tracks: Arc<RwLock<HashMap<FullTrackName, Track>>>,
publishers: Arc<RwLock<HashMap<u64, Connection>>>,
subscribers: Arc<RwLock<HashMap<u64, Vec<Connection>>>>,
}
impl RelayState {
fn new() -> Self {
Self {
tracks: Arc::new(RwLock::new(HashMap::new())),
publishers: Arc::new(RwLock::new(HashMap::new())),
subscribers: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn add_track(&self, track: Track) {
let mut tracks = self.tracks.write().await;
tracks.insert(track.full_track_name.clone(), track);
}
async fn get_track(&self, name: &FullTrackName) -> Option<Track> {
let tracks = self.tracks.read().await;
tracks.get(name).cloned()
}
async fn add_subscriber(&self, track_alias: u64, conn: Connection) {
let mut subs = self.subscribers.write().await;
subs.entry(track_alias).or_insert_with(Vec::new).push(conn);
}
}
Complete Relay Example
Here’s a simplified complete relay implementation:use moqtail::model::control::control_message::ControlMessage;
use moqtail::model::control::constant::DRAFT_14;
use moqtail::model::error::TerminationCode;
use moqtail::transport::control_stream_handler::ControlStreamHandler;
use std::sync::Arc;
use tokio::sync::RwLock;
use wtransport::{Endpoint, Identity, ServerConfig};
struct Relay {
state: Arc<RelayState>,
}
impl Relay {
fn new() -> Self {
Self {
state: Arc::new(RelayState::new()),
}
}
async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let identity = Identity::self_signed(std::iter::once("localhost"))?;
let config = ServerConfig::builder()
.with_bind_address("0.0.0.0:4433".parse()?)
.with_identity(identity)
.build();
let endpoint = Endpoint::server(config)?;
println!("Relay listening on {}", endpoint.local_addr()?);
loop {
let incoming = endpoint.accept().await;
let state = Arc::clone(&self.state);
tokio::spawn(async move {
let session_request = incoming.await.unwrap();
let connection = session_request.accept().await.unwrap();
let (send, recv) = connection.accept_bi().await.unwrap();
let mut control = ControlStreamHandler::new(send, recv);
// Handle setup and messages
handle_client(&mut control, state).await.ok();
});
}
}
}
async fn handle_client(
control: &mut ControlStreamHandler,
state: Arc<RelayState>,
) -> Result<(), Box<dyn std::error::Error>> {
// Setup handshake (omitted for brevity)
// Message loop
loop {
match control.next_message().await {
Ok(msg) => { /* handle messages */ }
Err(TerminationCode::NoError) => break,
Err(_) => break,
}
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let relay = Relay::new();
relay.run().await
}
Next Steps
Transport Layer
Deep dive into control and data stream handlers
Client Implementation
Build clients that connect to your relay
Protocol Model
Understand MOQT messages and data structures