Documentation Index
Fetch the complete documentation index at: https://mintlify.com/MercuryWorkshop/epoxy-tls/llms.txt
Use this file to discover all available pages before exploring further.
ServerMux<W> is a type alias for Multiplexor<ServerImpl<W>, W> that represents the server side of a Wisp connection. It performs the server-side handshake—sending the initial CONTINUE packet and, if requested, the INFO packet for Wisp v2—and then delivers incoming streams to your application one at a time through the wait_for_stream method. Each stream arrives paired with a ConnectPacket that describes the remote host, port, and transport type the client requested.
Creating a ServerMux
pub async fn new<R: TransportRead>(
rx: R,
tx: W,
buffer_size: u32,
wisp_v2: Option<WispV2Handshake>,
) -> Result<MuxResult<ServerImpl<W>, W>, WispError>
| Parameter | Description |
|---|
rx | Read half of the WebSocket transport (anything implementing TransportRead). |
tx | Write half of the WebSocket transport (anything implementing TransportWrite). |
buffer_size | Flow-control window advertised to the client in the CONTINUE packet on stream ID 0. Controls how many packets the client may send before waiting for credit. |
wisp_v2 | None for a v1-only server; Some(WispV2Handshake::new(builders)) to advertise v2 and negotiate extensions. |
new returns a MuxResult. Call either with_no_required_extensions() or with_required_extensions(&[ids]) on it to obtain the (ServerMux, MultiplexorActorFuture) pair. You must then spawn or await the actor future.
Accepting incoming streams
After construction, call wait_for_stream in a loop. It returns None only when the actor future has ended (the underlying WebSocket closed or close was called):
pub async fn wait_for_stream(&self) -> Option<(ConnectPacket, MuxStream<W>)>
ConnectPacket carries:
stream_type: StreamType — StreamType::Tcp or StreamType::Udp
host: String — the target hostname or IP the client wants to reach
port: u16 — the target port
MuxStream<W> is the bidirectional stream handle. It implements Stream<Item = Result<Payload, WispError>> for reading and Sink<Payload> for writing. It can also be split into independent halves or converted to a futures::AsyncRead + AsyncWrite + AsyncBufRead value via into_async_rw().
Minimal TCP-forwarding server
The example below accepts WebSocket connections with tokio-websockets, wraps each one in a ServerMux, and for every incoming stream opens a real TCP connection and proxies data in both directions:
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use wisp_mux::{
packet::{CloseReason, StreamType},
ws::{TokioWebsocketsTransport, TransportExt},
ServerMux, WispError,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Listen for plain TCP connections that will be upgraded to WebSocket
let listener = TcpListener::bind("0.0.0.0:4000").await?;
println!("Wisp server listening on :4000");
loop {
let (tcp, _peer) = listener.accept().await?;
tokio::spawn(async move {
if let Err(e) = handle_connection(tcp).await {
eprintln!("connection error: {e}");
}
});
}
}
async fn handle_connection(
tcp: TcpStream,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Upgrade TCP → WebSocket
let (_request, ws) = tokio_websockets::ServerBuilder::new()
.accept(tcp)
.await?;
let (rx, tx) = TokioWebsocketsTransport(ws).split_fast();
// Create the ServerMux (Wisp v1, buffer_size = 128)
let (mux, actor_fut) = ServerMux::new(rx, tx, 128, None)
.await?
.with_no_required_extensions();
// Spawn the actor
tokio::spawn(actor_fut);
// Accept streams
while let Some((connect_pkt, stream)) = mux.wait_for_stream().await {
tokio::spawn(async move {
if let Err(e) = forward_stream(connect_pkt, stream).await {
eprintln!("stream error: {e}");
}
});
}
Ok(())
}
async fn forward_stream(
connect_pkt: wisp_mux::packet::ConnectPacket,
stream: wisp_mux::stream::MuxStream<impl wisp_mux::ws::TransportWrite>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if connect_pkt.stream_type != StreamType::Tcp {
// UDP forwarding requires the UDP extension on both sides
stream
.close(CloseReason::ServerStreamInvalidInfo)
.await?;
return Ok(());
}
let target = format!("{}:{}", connect_pkt.host, connect_pkt.port);
let tcp = TcpStream::connect(&target).await?;
let (mut tcp_rx, mut tcp_tx) = tcp.into_split();
let (mut wisp_rx, mut wisp_tx) = stream.into_split();
// Wisp → TCP
let wisp_to_tcp = tokio::spawn(async move {
while let Some(chunk) = wisp_rx.next().await {
let data = chunk?;
tcp_tx.write_all(&data).await?;
}
Result::<(), Box<dyn std::error::Error + Send + Sync>>::Ok(())
});
// TCP → Wisp
let tcp_to_wisp = tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
let n = tcp_rx.read(&mut buf).await?;
if n == 0 {
break;
}
wisp_tx
.feed(Bytes::copy_from_slice(&buf[..n]))
.await?;
}
wisp_tx.flush().await?;
Result::<(), Box<dyn std::error::Error + Send + Sync>>::Ok(())
});
// Wait for either direction to finish
tokio::select! {
res = wisp_to_tcp => { res??; }
res = tcp_to_wisp => { res??; }
}
Ok(())
}
The actor future must be running (spawned or awaited concurrently) before you call wait_for_stream. If the actor exits, wait_for_stream will return None and your accept loop will terminate.
Using into_split and into_async_rw
MuxStream provides two convenience converters:
// Independent read/write halves (implement Stream and Sink respectively)
let (read_half, write_half) = stream.into_split();
// A single futures::AsyncRead + AsyncWrite + AsyncBufRead value
let async_rw = stream.into_async_rw();
MuxStreamRead and MuxStreamWrite each expose get_stream_id() -> u32 and get_close_reason() -> Option<CloseReason> so you can identify and inspect streams after splitting.
MuxStreamWrite additionally exposes:
close(reason: CloseReason) -> Result<(), WispError> — sends a CLOSE packet and prevents further writes.
get_close_handle() -> MuxStreamCloser<W> — a Clone-able handle that can close the stream from a separate task.
Wisp v2 server setup
Pass a WispV2Handshake to ServerMux::new to advertise extensions to connecting clients. The server sends the INFO packet first; if the client also replies with an INFO packet, the intersection of both sides’ extension lists is used.
use wisp_mux::{
extensions::{
motd::{MotdProtocolExtensionBuilder, MotdProtocolExtension},
password::PasswordProtocolExtensionBuilder,
udp::UdpProtocolExtensionBuilder,
AnyProtocolExtensionBuilder,
},
ServerMux, WispV2Handshake,
};
let extensions: Vec<AnyProtocolExtensionBuilder> = vec![
AnyProtocolExtensionBuilder::new(UdpProtocolExtensionBuilder),
// Serve a message-of-the-day to connecting clients
AnyProtocolExtensionBuilder::new(
MotdProtocolExtensionBuilder::Server("Welcome to the Wisp server!".to_string())
),
// Require password authentication from clients
AnyProtocolExtensionBuilder::new(
PasswordProtocolExtensionBuilder::new_server(|username, password| {
Box::pin(async move {
// Return Ok(()) to accept, Err(WispError::PasswordExtensionCredsInvalid) to reject
if username == "alice" && password == "hunter2" {
Ok(())
} else {
Err(wisp_mux::WispError::PasswordExtensionCredsInvalid)
}
})
})
),
];
let (mux, actor_fut) = ServerMux::new(rx, tx, 128, Some(WispV2Handshake::new(extensions)))
.await?
.with_no_required_extensions();
After the handshake completes, inspect what was actually agreed upon:
println!("negotiated extensions: {:?}", mux.get_extension_ids());
println!("client downgraded to v1: {}", mux.was_downgraded());
When using the password or certificate authentication extensions on the server, a failed handshake causes ServerMux::new to return an error and automatically sends the appropriate close packet (ExtensionsPasswordAuthFailed or ExtensionsCertAuthFailed) before closing the WebSocket. You do not need to send a close packet yourself.
Closing the server-side connection
// Close all streams gracefully (no reason code on stream ID 0)
mux.close().await?;
// Close with an explicit reason code
use wisp_mux::packet::CloseReason;
mux.close_with_reason(CloseReason::Unknown).await?;
Both calls signal the actor future to terminate, which in turn drops all open streams.
Error handling
| Variant | When it occurs on the server |
|---|
WsImplSocketClosed | Client disconnected before the handshake or actor completed. |
WsImplError(e) | Transport-level I/O error from the WebSocket backend. |
MuxMessageFailedToSend | Internal channel broke — typically the actor task panicked or was aborted. |
StreamAlreadyClosed | A write was attempted on a stream the client already sent a CLOSE for. |
PasswordExtensionCredsInvalid | The client supplied wrong credentials during v2 extension handshake. |
CertAuthExtensionSigInvalid | The client’s Ed25519 certificate signature failed verification. |
CertAuthExtensionNoKey | The server-side builder has no signing key configured. |
ExtensionsNotSupported(ids) | Returned by with_required_extensions if the client did not advertise the listed IDs. |
IncompatibleProtocolVersion(found, needed) | Client sent an INFO packet claiming a version the server cannot handle. |