Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/MercuryWorkshop/epoxy-tls/llms.txt

Use this file to discover all available pages before exploring further.

ServerMux<W> is a type alias for Multiplexor<ServerImpl<W>, W> that represents the server side of a Wisp connection. It performs the server-side handshake—sending the initial CONTINUE packet and, if requested, the INFO packet for Wisp v2—and then delivers incoming streams to your application one at a time through the wait_for_stream method. Each stream arrives paired with a ConnectPacket that describes the remote host, port, and transport type the client requested.

Creating a ServerMux

pub async fn new<R: TransportRead>(
    rx: R,
    tx: W,
    buffer_size: u32,
    wisp_v2: Option<WispV2Handshake>,
) -> Result<MuxResult<ServerImpl<W>, W>, WispError>
ParameterDescription
rxRead half of the WebSocket transport (anything implementing TransportRead).
txWrite half of the WebSocket transport (anything implementing TransportWrite).
buffer_sizeFlow-control window advertised to the client in the CONTINUE packet on stream ID 0. Controls how many packets the client may send before waiting for credit.
wisp_v2None for a v1-only server; Some(WispV2Handshake::new(builders)) to advertise v2 and negotiate extensions.
new returns a MuxResult. Call either with_no_required_extensions() or with_required_extensions(&[ids]) on it to obtain the (ServerMux, MultiplexorActorFuture) pair. You must then spawn or await the actor future.

Accepting incoming streams

After construction, call wait_for_stream in a loop. It returns None only when the actor future has ended (the underlying WebSocket closed or close was called):
pub async fn wait_for_stream(&self) -> Option<(ConnectPacket, MuxStream<W>)>
ConnectPacket carries:
  • stream_type: StreamTypeStreamType::Tcp or StreamType::Udp
  • host: String — the target hostname or IP the client wants to reach
  • port: u16 — the target port
MuxStream<W> is the bidirectional stream handle. It implements Stream<Item = Result<Payload, WispError>> for reading and Sink<Payload> for writing. It can also be split into independent halves or converted to a futures::AsyncRead + AsyncWrite + AsyncBufRead value via into_async_rw().

Minimal TCP-forwarding server

The example below accepts WebSocket connections with tokio-websockets, wraps each one in a ServerMux, and for every incoming stream opens a real TCP connection and proxies data in both directions:
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use wisp_mux::{
    packet::{CloseReason, StreamType},
    ws::{TokioWebsocketsTransport, TransportExt},
    ServerMux, WispError,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Listen for plain TCP connections that will be upgraded to WebSocket
    let listener = TcpListener::bind("0.0.0.0:4000").await?;
    println!("Wisp server listening on :4000");

    loop {
        let (tcp, _peer) = listener.accept().await?;

        tokio::spawn(async move {
            if let Err(e) = handle_connection(tcp).await {
                eprintln!("connection error: {e}");
            }
        });
    }
}

async fn handle_connection(
    tcp: TcpStream,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    // Upgrade TCP → WebSocket
    let (_request, ws) = tokio_websockets::ServerBuilder::new()
        .accept(tcp)
        .await?;

    let (rx, tx) = TokioWebsocketsTransport(ws).split_fast();

    // Create the ServerMux (Wisp v1, buffer_size = 128)
    let (mux, actor_fut) = ServerMux::new(rx, tx, 128, None)
        .await?
        .with_no_required_extensions();

    // Spawn the actor
    tokio::spawn(actor_fut);

    // Accept streams
    while let Some((connect_pkt, stream)) = mux.wait_for_stream().await {
        tokio::spawn(async move {
            if let Err(e) = forward_stream(connect_pkt, stream).await {
                eprintln!("stream error: {e}");
            }
        });
    }

    Ok(())
}

async fn forward_stream(
    connect_pkt: wisp_mux::packet::ConnectPacket,
    stream: wisp_mux::stream::MuxStream<impl wisp_mux::ws::TransportWrite>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    if connect_pkt.stream_type != StreamType::Tcp {
        // UDP forwarding requires the UDP extension on both sides
        stream
            .close(CloseReason::ServerStreamInvalidInfo)
            .await?;
        return Ok(());
    }

    let target = format!("{}:{}", connect_pkt.host, connect_pkt.port);
    let tcp = TcpStream::connect(&target).await?;
    let (mut tcp_rx, mut tcp_tx) = tcp.into_split();

    let (mut wisp_rx, mut wisp_tx) = stream.into_split();

    // Wisp → TCP
    let wisp_to_tcp = tokio::spawn(async move {
        while let Some(chunk) = wisp_rx.next().await {
            let data = chunk?;
            tcp_tx.write_all(&data).await?;
        }
        Result::<(), Box<dyn std::error::Error + Send + Sync>>::Ok(())
    });

    // TCP → Wisp
    let tcp_to_wisp = tokio::spawn(async move {
        let mut buf = vec![0u8; 4096];
        loop {
            let n = tcp_rx.read(&mut buf).await?;
            if n == 0 {
                break;
            }
            wisp_tx
                .feed(Bytes::copy_from_slice(&buf[..n]))
                .await?;
        }
        wisp_tx.flush().await?;
        Result::<(), Box<dyn std::error::Error + Send + Sync>>::Ok(())
    });

    // Wait for either direction to finish
    tokio::select! {
        res = wisp_to_tcp => { res??; }
        res = tcp_to_wisp  => { res??; }
    }

    Ok(())
}
The actor future must be running (spawned or awaited concurrently) before you call wait_for_stream. If the actor exits, wait_for_stream will return None and your accept loop will terminate.

Using into_split and into_async_rw

MuxStream provides two convenience converters:
// Independent read/write halves (implement Stream and Sink respectively)
let (read_half, write_half) = stream.into_split();

// A single futures::AsyncRead + AsyncWrite + AsyncBufRead value
let async_rw = stream.into_async_rw();
MuxStreamRead and MuxStreamWrite each expose get_stream_id() -> u32 and get_close_reason() -> Option<CloseReason> so you can identify and inspect streams after splitting. MuxStreamWrite additionally exposes:
  • close(reason: CloseReason) -> Result<(), WispError> — sends a CLOSE packet and prevents further writes.
  • get_close_handle() -> MuxStreamCloser<W> — a Clone-able handle that can close the stream from a separate task.

Wisp v2 server setup

Pass a WispV2Handshake to ServerMux::new to advertise extensions to connecting clients. The server sends the INFO packet first; if the client also replies with an INFO packet, the intersection of both sides’ extension lists is used.
use wisp_mux::{
    extensions::{
        motd::{MotdProtocolExtensionBuilder, MotdProtocolExtension},
        password::PasswordProtocolExtensionBuilder,
        udp::UdpProtocolExtensionBuilder,
        AnyProtocolExtensionBuilder,
    },
    ServerMux, WispV2Handshake,
};

let extensions: Vec<AnyProtocolExtensionBuilder> = vec![
    AnyProtocolExtensionBuilder::new(UdpProtocolExtensionBuilder),
    // Serve a message-of-the-day to connecting clients
    AnyProtocolExtensionBuilder::new(
        MotdProtocolExtensionBuilder::Server("Welcome to the Wisp server!".to_string())
    ),
    // Require password authentication from clients
    AnyProtocolExtensionBuilder::new(
        PasswordProtocolExtensionBuilder::new_server(|username, password| {
            Box::pin(async move {
                // Return Ok(()) to accept, Err(WispError::PasswordExtensionCredsInvalid) to reject
                if username == "alice" && password == "hunter2" {
                    Ok(())
                } else {
                    Err(wisp_mux::WispError::PasswordExtensionCredsInvalid)
                }
            })
        })
    ),
];

let (mux, actor_fut) = ServerMux::new(rx, tx, 128, Some(WispV2Handshake::new(extensions)))
    .await?
    .with_no_required_extensions();
After the handshake completes, inspect what was actually agreed upon:
println!("negotiated extensions: {:?}", mux.get_extension_ids());
println!("client downgraded to v1: {}", mux.was_downgraded());
When using the password or certificate authentication extensions on the server, a failed handshake causes ServerMux::new to return an error and automatically sends the appropriate close packet (ExtensionsPasswordAuthFailed or ExtensionsCertAuthFailed) before closing the WebSocket. You do not need to send a close packet yourself.

Closing the server-side connection

// Close all streams gracefully (no reason code on stream ID 0)
mux.close().await?;

// Close with an explicit reason code
use wisp_mux::packet::CloseReason;
mux.close_with_reason(CloseReason::Unknown).await?;
Both calls signal the actor future to terminate, which in turn drops all open streams.

Error handling

VariantWhen it occurs on the server
WsImplSocketClosedClient disconnected before the handshake or actor completed.
WsImplError(e)Transport-level I/O error from the WebSocket backend.
MuxMessageFailedToSendInternal channel broke — typically the actor task panicked or was aborted.
StreamAlreadyClosedA write was attempted on a stream the client already sent a CLOSE for.
PasswordExtensionCredsInvalidThe client supplied wrong credentials during v2 extension handshake.
CertAuthExtensionSigInvalidThe client’s Ed25519 certificate signature failed verification.
CertAuthExtensionNoKeyThe server-side builder has no signing key configured.
ExtensionsNotSupported(ids)Returned by with_required_extensions if the client did not advertise the listed IDs.
IncompatibleProtocolVersion(found, needed)Client sent an INFO packet claiming a version the server cannot handle.

Build docs developers (and LLMs) love