Skip to main content

Endpoint

GET /
Upgrades an HTTP connection to a WebSocket for streaming time-series data points.

Connection Upgrade

Request

wscat -c ws://0.0.0.0:2077/
Headers:
Connection: Upgrade
Upgrade: websocket

Response

Status Code: 101 Switching Protocols The server assigns a unique connection ID and begins accepting binary frames.

Message Format

Binary Frame Structure

Slung expects binary WebSocket frames with the following little-endian format:
[timestamp:i64][value:f64][series_len:u16][tag_count:u16][series][tag_len:u16+tag_bytes]...
timestamp
int64
required
Unix timestamp in nanoseconds (signed 64-bit integer, little-endian)
value
float64
required
Measurement value (64-bit floating point, little-endian)
series_len
uint16
required
Length of the series name in bytes (little-endian)
tag_count
uint16
required
Number of tags attached to this data point (little-endian)
series
bytes
required
UTF-8 encoded series name (length specified by series_len)
tags
bytes[]
required
Array of tags, each prefixed with u16 length in little-endian

Example: Single Data Point

Sending a temperature reading:
  • Timestamp: 1709481600000000000 (nanoseconds)
  • Value: 23.5
  • Series: "temperature"
  • Tags: ["sensor=living_room", "unit=celsius"]
Binary breakdown:
OffsetBytesFieldValue
0-78timestamp0x00 0x00 0xC4 0xB6 0xE3 0x5C 0xB8 0x17
8-158value0x00 0x00 0x00 0x00 0x00 0x80 0x37 0x40
16-172series_len0x0B 0x00 (11)
18-192tag_count0x02 0x00 (2)
20-3011series"temperature"
31-322tag_len0x13 0x00 (19)
33-5119tag"sensor=living_room"
52-532tag_len0x0C 0x00 (12)
54-6512tag"unit=celsius"

Implementation Details

Connection Handling

From src/main.zig:328-358:
fn handleWebSocket(context: *AppContext, req: *http.Request, res: *http.Response) !void {
    var websocket = try res.upgradeWebSocket(req) orelse {
        try handleIndexPost(context, req, res);
        return;
    };
    const id = try context.server.addConnection(&websocket);
    errdefer websocket.close(.internal_error, "handler error") catch {};
    errdefer context.server.removeConnection(id);
    defer context.server.removeConnection(id);

    while (true) {
        const msg = websocket.receive() catch |err| switch (err) {
            error.EndOfStream => break,
            else => return err,
        };

        switch (msg.type) {
            .text => {
                handleMessage(msg, id, context) catch return;
            },
            .binary => {
                handleMessage(msg, id, context) catch return;
            },
            .close => {
                std.log.info("Client closed connection", .{});
                break;
            },
            else => {},
        }
    }
}

Message Decoding

From src/main.zig:424-469:
fn decodeBinaryMessage(allocator: std.mem.Allocator, data: []const u8, tag_scratch: *std.ArrayList([]const u8)) !DecodedMessage {
    // little-endian:
    // [timestamp:i64][value:f64][series_len:u16][tag_count:u16][series][tag_len:u16+tag_bytes]...
    const header_len = 8 + 8 + 2 + 2;
    if (data.len < header_len) return error.InvalidMessage;

    var offset: usize = 0;
    const timestamp_bits = std.mem.readInt(u64, data[offset..][0..8], .little);
    const timestamp: i64 = @bitCast(timestamp_bits);
    offset += 8;

    const value_bits = std.mem.readInt(u64, data[offset..][0..8], .little);
    const value: f64 = @bitCast(value_bits);
    offset += 8;

    const series_len = @as(usize, std.mem.readInt(u16, data[offset..][0..2], .little));
    offset += 2;
    const tag_count = @as(usize, std.mem.readInt(u16, data[offset..][0..2], .little));
    offset += 2;

    if (offset + series_len > data.len) return error.InvalidMessage;
    const series = data[offset .. offset + series_len];
    offset += series_len;

    tag_scratch.clearRetainingCapacity();
    try tag_scratch.ensureTotalCapacity(allocator, tag_count);

    var i: usize = 0;
    while (i < tag_count) : (i += 1) {
        if (offset + 2 > data.len) return error.InvalidMessage;
        const tag_len = @as(usize, std.mem.readInt(u16, data[offset..][0..2], .little));
        offset += 2;
        if (offset + tag_len > data.len) return error.InvalidMessage;
        try tag_scratch.append(allocator, data[offset .. offset + tag_len]);
        offset += tag_len;
    }

    if (offset != data.len) return error.InvalidMessage;

    return .{
        .timestamp = timestamp,
        .value = value,
        .series = series,
        .tags = tag_scratch.items,
    };
}

Error Handling

Invalid Message Format

If a message cannot be decoded, the server logs a warning and continues:
Ignoring websocket payload; expected binary [i64 timestamp][f64 value][u16 series_len][u16 tag_count][series][tags...]
The connection remains open for subsequent messages.

Connection Termination

Connections close when:
  • Client sends a WebSocket close frame
  • Client disconnects (end of stream)
  • Internal channel closes (error.ChannelClosed)
  • Operation is canceled (error.Canceled)

Fallback Behavior

If the WebSocket upgrade fails at GET /, the server falls back to handling it as POST /, which returns:Status: 400 Bad Request
{
  "status": "ok",
  "info": "use websocket binary frames to stream data"
}

Client Examples

Python Client

import struct
import websocket

def send_datapoint(ws, timestamp, value, series, tags):
    # Pack header
    message = struct.pack('<qd', timestamp, value)
    
    # Pack series
    series_bytes = series.encode('utf-8')
    message += struct.pack('<HH', len(series_bytes), len(tags))
    message += series_bytes
    
    # Pack tags
    for tag in tags:
        tag_bytes = tag.encode('utf-8')
        message += struct.pack('<H', len(tag_bytes))
        message += tag_bytes
    
    ws.send(message, websocket.ABNF.OPCODE_BINARY)

ws = websocket.create_connection("ws://0.0.0.0:2077/")
send_datapoint(
    ws,
    timestamp=1709481600000000000,
    value=23.5,
    series="temperature",
    tags=["sensor=living_room", "unit=celsius"]
)
ws.close()

Rust Client

use tokio_tungstenite::{connect_async, tungstenite::Message};

#[tokio::main]
async fn main() {
    let (ws, _) = connect_async("ws://0.0.0.0:2077/").await.unwrap();
    
    let mut message = Vec::new();
    message.extend_from_slice(&1709481600000000000i64.to_le_bytes());
    message.extend_from_slice(&23.5f64.to_le_bytes());
    message.extend_from_slice(&11u16.to_le_bytes()); // series_len
    message.extend_from_slice(&2u16.to_le_bytes());  // tag_count
    message.extend_from_slice(b"temperature");
    message.extend_from_slice(&19u16.to_le_bytes());
    message.extend_from_slice(b"sensor=living_room");
    message.extend_from_slice(&12u16.to_le_bytes());
    message.extend_from_slice(b"unit=celsius");
    
    ws.send(Message::Binary(message)).await.unwrap();
}

Performance Considerations

  • Connection pooling: Each WebSocket connection gets a unique ID. The server supports multiple concurrent connections via CHANNEL_CAPACITY = 8192 * 2
  • Frame batching: Send multiple data points in separate frames rather than concatenating into one large frame
  • Binary vs Text: Both binary and text frames are accepted, but binary is preferred for efficiency

Build docs developers (and LLMs) love