Endpoint
Upgrades an HTTP connection to a WebSocket for streaming time-series data points.
Connection Upgrade
Request
wscat -c ws://0.0.0.0:2077/
Headers:
Connection: Upgrade
Upgrade: websocket
Response
Status Code: 101 Switching Protocols
The server assigns a unique connection ID and begins accepting binary frames.
Binary Frame Structure
Slung expects binary WebSocket frames with the following little-endian format:
[timestamp:i64][value:f64][series_len:u16][tag_count:u16][series][tag_len:u16+tag_bytes]...
Unix timestamp in nanoseconds (signed 64-bit integer, little-endian)
Measurement value (64-bit floating point, little-endian)
Length of the series name in bytes (little-endian)
Number of tags attached to this data point (little-endian)
UTF-8 encoded series name (length specified by series_len)
Array of tags, each prefixed with u16 length in little-endian
Example: Single Data Point
Sending a temperature reading:
- Timestamp:
1709481600000000000 (nanoseconds)
- Value:
23.5
- Series:
"temperature"
- Tags:
["sensor=living_room", "unit=celsius"]
Binary breakdown:
| Offset | Bytes | Field | Value |
|---|
| 0-7 | 8 | timestamp | 0x00 0x00 0xC4 0xB6 0xE3 0x5C 0xB8 0x17 |
| 8-15 | 8 | value | 0x00 0x00 0x00 0x00 0x00 0x80 0x37 0x40 |
| 16-17 | 2 | series_len | 0x0B 0x00 (11) |
| 18-19 | 2 | tag_count | 0x02 0x00 (2) |
| 20-30 | 11 | series | "temperature" |
| 31-32 | 2 | tag_len | 0x13 0x00 (19) |
| 33-51 | 19 | tag | "sensor=living_room" |
| 52-53 | 2 | tag_len | 0x0C 0x00 (12) |
| 54-65 | 12 | tag | "unit=celsius" |
Implementation Details
Connection Handling
From src/main.zig:328-358:
fn handleWebSocket(context: *AppContext, req: *http.Request, res: *http.Response) !void {
var websocket = try res.upgradeWebSocket(req) orelse {
try handleIndexPost(context, req, res);
return;
};
const id = try context.server.addConnection(&websocket);
errdefer websocket.close(.internal_error, "handler error") catch {};
errdefer context.server.removeConnection(id);
defer context.server.removeConnection(id);
while (true) {
const msg = websocket.receive() catch |err| switch (err) {
error.EndOfStream => break,
else => return err,
};
switch (msg.type) {
.text => {
handleMessage(msg, id, context) catch return;
},
.binary => {
handleMessage(msg, id, context) catch return;
},
.close => {
std.log.info("Client closed connection", .{});
break;
},
else => {},
}
}
}
Message Decoding
From src/main.zig:424-469:
fn decodeBinaryMessage(allocator: std.mem.Allocator, data: []const u8, tag_scratch: *std.ArrayList([]const u8)) !DecodedMessage {
// little-endian:
// [timestamp:i64][value:f64][series_len:u16][tag_count:u16][series][tag_len:u16+tag_bytes]...
const header_len = 8 + 8 + 2 + 2;
if (data.len < header_len) return error.InvalidMessage;
var offset: usize = 0;
const timestamp_bits = std.mem.readInt(u64, data[offset..][0..8], .little);
const timestamp: i64 = @bitCast(timestamp_bits);
offset += 8;
const value_bits = std.mem.readInt(u64, data[offset..][0..8], .little);
const value: f64 = @bitCast(value_bits);
offset += 8;
const series_len = @as(usize, std.mem.readInt(u16, data[offset..][0..2], .little));
offset += 2;
const tag_count = @as(usize, std.mem.readInt(u16, data[offset..][0..2], .little));
offset += 2;
if (offset + series_len > data.len) return error.InvalidMessage;
const series = data[offset .. offset + series_len];
offset += series_len;
tag_scratch.clearRetainingCapacity();
try tag_scratch.ensureTotalCapacity(allocator, tag_count);
var i: usize = 0;
while (i < tag_count) : (i += 1) {
if (offset + 2 > data.len) return error.InvalidMessage;
const tag_len = @as(usize, std.mem.readInt(u16, data[offset..][0..2], .little));
offset += 2;
if (offset + tag_len > data.len) return error.InvalidMessage;
try tag_scratch.append(allocator, data[offset .. offset + tag_len]);
offset += tag_len;
}
if (offset != data.len) return error.InvalidMessage;
return .{
.timestamp = timestamp,
.value = value,
.series = series,
.tags = tag_scratch.items,
};
}
Error Handling
If a message cannot be decoded, the server logs a warning and continues:
Ignoring websocket payload; expected binary [i64 timestamp][f64 value][u16 series_len][u16 tag_count][series][tags...]
The connection remains open for subsequent messages.
Connection Termination
Connections close when:
- Client sends a WebSocket close frame
- Client disconnects (end of stream)
- Internal channel closes (
error.ChannelClosed)
- Operation is canceled (
error.Canceled)
Fallback Behavior
If the WebSocket upgrade fails at GET /, the server falls back to handling it as POST /, which returns:Status: 400 Bad Request{
"status": "ok",
"info": "use websocket binary frames to stream data"
}
Client Examples
Python Client
import struct
import websocket
def send_datapoint(ws, timestamp, value, series, tags):
# Pack header
message = struct.pack('<qd', timestamp, value)
# Pack series
series_bytes = series.encode('utf-8')
message += struct.pack('<HH', len(series_bytes), len(tags))
message += series_bytes
# Pack tags
for tag in tags:
tag_bytes = tag.encode('utf-8')
message += struct.pack('<H', len(tag_bytes))
message += tag_bytes
ws.send(message, websocket.ABNF.OPCODE_BINARY)
ws = websocket.create_connection("ws://0.0.0.0:2077/")
send_datapoint(
ws,
timestamp=1709481600000000000,
value=23.5,
series="temperature",
tags=["sensor=living_room", "unit=celsius"]
)
ws.close()
Rust Client
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::main]
async fn main() {
let (ws, _) = connect_async("ws://0.0.0.0:2077/").await.unwrap();
let mut message = Vec::new();
message.extend_from_slice(&1709481600000000000i64.to_le_bytes());
message.extend_from_slice(&23.5f64.to_le_bytes());
message.extend_from_slice(&11u16.to_le_bytes()); // series_len
message.extend_from_slice(&2u16.to_le_bytes()); // tag_count
message.extend_from_slice(b"temperature");
message.extend_from_slice(&19u16.to_le_bytes());
message.extend_from_slice(b"sensor=living_room");
message.extend_from_slice(&12u16.to_le_bytes());
message.extend_from_slice(b"unit=celsius");
ws.send(Message::Binary(message)).await.unwrap();
}
- Connection pooling: Each WebSocket connection gets a unique ID. The server supports multiple concurrent connections via
CHANNEL_CAPACITY = 8192 * 2
- Frame batching: Send multiple data points in separate frames rather than concatenating into one large frame
- Binary vs Text: Both binary and text frames are accepted, but binary is preferred for efficiency