Skip to main content

Overview

Slung executes user-defined functions in WebAssembly for real-time stream processing. The WASM runtime provides isolation, portability, and safe execution of custom logic over time-series data.

Runtime Architecture

WASM Engine

Slung uses zware, a Zig-based WebAssembly interpreter:
pub fn spawn(allocator: Allocator, bytes: []const u8, context_ptr: usize) !void {
    var store = Store.init(allocator);
    defer store.deinit();
    
    try host.initHostFunctions(&store, context_ptr);
    
    var module = Module.init(allocator, bytes);
    defer module.deinit();
    try module.decode();
    
    var instance = Instance.init(allocator, &store, module);
    defer instance.deinit();
    try instance.instantiate();
    
    var input = [0]u64{};
    var output = [1]u64{0};
    
    try instance.invoke("call", &input, &output, .{
        .frame_stack_size = 4096,
        .label_stack_size = 4096,
        .operand_stack_size = 16384,
    });
}
Stack configuration:
  • Frame stack: 4096 entries
  • Label stack: 4096 entries
  • Operand stack: 16384 entries

Execution Flow

  1. Load WASM module bytes
  2. Initialize store with host functions
  3. Decode WASM module
  4. Instantiate module
  5. Invoke call() entry point
  6. Cleanup and deinit

Host Functions

Slung exposes 9 host functions to WASM modules (src/host/host.zig):

Query Functions

u_query_live - Register live query
try store.exposeHostFunction(
    "env", 
    "u_query_live", 
    u_query_live, 
    context_ptr, 
    &[_]zware.ValType{.I32},  // filter_ptr
    &[_]zware.ValType{.I64}   // returns query_id
);
Registers a query that tracks incoming events matching the filter:
pub fn u_query_live(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const filter_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    const filter_bytes = try readRegion(memory, filter_ptr);
    
    const query_id = context.server.next_query_id.fetchAdd(1, .monotonic);
    const query = Query.init(filter_bytes) catch {
        try vm.pushOperand(u64, 0);
        return;
    };
    
    context.server.queries.put(query_id, query) catch {
        try vm.pushOperand(u64, 0);
        return;
    };
    
    try vm.pushOperand(u64, query_id);
}
u_query_history - Query historical data
try store.exposeHostFunction(
    "env", 
    "u_query_history", 
    u_query_history, 
    context_ptr, 
    &[_]zware.ValType{.I32},  // filter_ptr
    &[_]zware.ValType{.F64}   // returns aggregated value
);
Executes aggregation query over historical data:
pub fn u_query_history(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const filter_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    const filter_bytes = try readRegion(memory, filter_ptr);
    
    const query = Query.init(filter_bytes) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    
    // Map operation and execute
    var op: QueryOp = /* map from query.op */;
    const series_keys = context.server.matchingSeriesKeysForQuery(context.io.allocator, &query) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    defer context.io.allocator.free(series_keys);
    
    const start = if (query.has_time_range) query.time_start else std.math.minInt(i64);
    const end = if (query.has_time_range) query.time_end else std.math.maxInt(i64);
    const value = aggregateHistory(context, series_keys, start, end, op) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    
    try vm.pushOperand(f64, value.Float);
}
u_poll_handle - Poll for query events
try store.exposeHostFunction(
    "env", 
    "u_poll_handle", 
    u_poll_handle, 
    context_ptr, 
    &[_]zware.ValType{.I64},  // query_id
    &[_]zware.ValType{.I32}   // returns event_ptr or 0
);
Polls for new events matching a live query:
pub fn u_poll_handle(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const handle_u64 = vm.popOperand(u64);
    const handle = std.math.cast(u32, handle_u64) orelse {
        try vm.pushOperand(u32, 0);
        return;
    };
    
    context.server.connections_mutex.lock();
    const event = context.server.query_events.fetchRemove(handle);
    context.server.connections_mutex.unlock();
    
    if (event == null) {
        zio.yield() catch {};
        try vm.pushOperand(u32, 0);
        return;
    }
    
    const json_bytes = pollEventJson(context.io.allocator, event.?.value) catch {
        try vm.pushOperand(u32, 0);
        return;
    };
    defer context.io.allocator.free(json_bytes);
    
    const memory = try vm.inst.getMemory(0);
    var api = Api.init(vm.inst);
    const region_ptr = allocateAndWriteRegion(&api, memory, json_bytes) catch {
        try vm.pushOperand(u32, 0);
        return;
    };
    
    try vm.pushOperand(u32, region_ptr);
}
Returns JSON event:
{"timestamp":1234567890,"value":42.5,"tags":[],"producers":[1]}
u_free_handle - Free query handle
try store.exposeHostFunction(
    "env", 
    "u_free_handle", 
    u_free_handle, 
    context_ptr, 
    &[_]zware.ValType{.I64},  // query_id
    &[_]zware.ValType{.I32}   // returns 0 on success
);

Write Functions

u_write_event - Write data point
try store.exposeHostFunction(
    "env", 
    "u_write_event", 
    u_write_event, 
    context_ptr, 
    &[_]zware.ValType{ .I32, .I32, .I32 },  // tags_ptr, value_ptr, timestamp_ptr
    &[_]zware.ValType{.I32}                  // returns 0 on success
);
Inserts a data point into the TSM tree:
pub fn u_write_event(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const tags_ptr = vm.popOperand(u32);
    const value_ptr = vm.popOperand(u32);
    const timestamp_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    
    const timestamp_bytes = readRegion(memory, timestamp_ptr) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    const value_bytes = readRegion(memory, value_ptr) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    const tags_bytes = readRegion(memory, tags_ptr) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    const timestamp = std.fmt.parseInt(i64, std.mem.trim(u8, timestamp_bytes, " \t\r\n"), 10) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    const value = std.fmt.parseFloat(f64, std.mem.trim(u8, value_bytes, " \t\r\n")) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    var tags = std.ArrayList([]const u8).empty;
    defer tags.deinit(context.io.allocator);
    
    const parsed = parseWriteEventTags(tags_bytes, context.io.allocator, &tags) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    var key_scratch = std.ArrayList(u8).empty;
    defer key_scratch.deinit(context.io.allocator);
    const series_key = context.server.resolveSeriesKey(&key_scratch, parsed.series, parsed.tags) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    // Insert into TSM tree
    context.server.tree.insert(series_key, .{
        .timestamp = timestamp,
        .value = .{ .Float = value },
    }) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    // Update live queries
    context.server.connections_mutex.lock();
    defer context.server.connections_mutex.unlock();
    
    var iter_queries = context.server.queries.iterator();
    while (iter_queries.next()) |entry| {
        const query_id = entry.key_ptr.*;
        const q = entry.value_ptr;
        if (!std.mem.eql(u8, q.series, parsed.series)) continue;
        if (!q.matchesTags(parsed.tags)) continue;
        if (q.has_time_range and (timestamp < q.time_start or timestamp > q.time_end)) continue;
        
        // Update aggregation state
        switch (q.op) {
            .Avg => {
                q.*.op.Avg.count += 1;
                q.*.op.Avg.sum += value;
            },
            .Sum => q.*.op.Sum += value,
            .Count => q.*.op.Count += 1,
            .Max => if (value > q.*.op.Max) q.*.op.Max = value,
            .Min => if (value < q.*.op.Min) q.*.op.Min = value,
            .None => {},
        }
        
        context.server.query_events.put(query_id, .{
            .timestamp = timestamp,
            .value = value,
            .producer = 0,
        }) catch {};
    }
    
    try vm.pushOperand(u32, 0);
}

Write-back Functions

u_writeback_http - HTTP callback
try store.exposeHostFunction(
    "env", 
    "u_writeback_http", 
    u_writeback_http, 
    context_ptr, 
    &[_]zware.ValType{ .I32, .I32, .I32 },  // method, data_ptr, url_ptr
    &[_]zware.ValType{.I32}                  // returns response_ptr
);
Makes HTTP requests:
pub fn u_writeback_http(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    var client = http.Client.init(context.io.allocator, .{});
    defer client.deinit();
    
    const method = vm.popOperand(u32);
    const data_ptr = vm.popOperand(u32);
    const url_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    
    const url_bytes = try readRegion(memory, url_ptr);
    const data_bytes = try readRegion(memory, data_ptr);
    
    switch (method) {
        0 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .get),
        1 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .post),
        2 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .put),
        3 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .delete),
        else => {
            try vm.pushOperand(u32, 0);
            return;
        },
    }
}
HTTP methods:
  • 0 = GET
  • 1 = POST
  • 2 = PUT
  • 3 = DELETE
u_writeback_ws - WebSocket callback
try store.exposeHostFunction(
    "env", 
    "u_writeback_ws", 
    u_writeback_ws, 
    context_ptr, 
    &[_]zware.ValType{ .I64, .I32 },  // producer_id, data_ptr
    &[_]zware.ValType{.I32}            // returns 0 on success
);
Sends data to WebSocket connection:
pub fn u_writeback_ws(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    
    const data_ptr = vm.popOperand(u32);
    const producer = vm.popOperand(u64);
    
    const memory = try vm.inst.getMemory(0);
    const data_bytes = try readRegion(memory, data_ptr);
    
    if (context.server.connections.get(producer)) |connection| {
        connection.send(.text, data_bytes) catch {
            context.server.connections_mutex.lock();
            defer context.server.connections_mutex.unlock();
            _ = context.server.connections.remove(producer);
            try vm.pushOperand(u32, 1);
            return;
        };
    } else {
        try vm.pushOperand(u32, 1);
        return;
    }
    
    try vm.pushOperand(u32, 0);
}

WASI Functions

Standard WASI preview1 functions:
try store.exposeHostFunction("wasi_snapshot_preview1", "fd_write", zware.wasi.fd_write, 0, &[_]zware.ValType{ .I32, .I32, .I32, .I32 }, &[_]zware.ValType{.I32});
try store.exposeHostFunction("wasi_snapshot_preview1", "environ_get", zware.wasi.environ_get, 0, &[_]zware.ValType{ .I32, .I32 }, &[_]zware.ValType{.I32});
try store.exposeHostFunction("wasi_snapshot_preview1", "environ_sizes_get", zware.wasi.environ_sizes_get, 0, &[_]zware.ValType{ .I32, .I32 }, &[_]zware.ValType{.I32});
try store.exposeHostFunction("wasi_snapshot_preview1", "proc_exit", zware.wasi.proc_exit, 0, &[_]zware.ValType{.I32}, &[_]zware.ValType{});

Memory Interface

Host functions use a Region-based memory interface:

Region Structure

// Region struct in WASM memory: 12 bytes
struct Region {
    offset: u32,    // offset in WASM linear memory
    capacity: u32,  // allocated capacity
    length: u32,    // actual data length
}

Reading from WASM Memory

fn readRegion(memory: *zware.Memory, region_ptr: u32) ![]const u8 {
    const data = memory.memory();
    
    // Read Region struct: { offset: u32, capacity: u32, length: u32 }
    const offset = try memory.read(u32, region_ptr, 0);
    const length = try memory.read(u32, region_ptr, 8);
    
    return data[offset .. offset + length];
}

Writing to WASM Memory

fn allocateAndWriteRegion(api: *Api, memory: *zware.Memory, data_bytes: []const u8) !u32 {
    // Allocate space for data
    const data_offset: u32 = @intCast(try api.allocate(@intCast(data_bytes.len)));
    
    // Write the data to guest memory
    for (data_bytes, 0..) |byte, i| {
        try memory.write(u8, data_offset + @as(u32, @intCast(i)), 0, byte);
    }
    
    // Allocate space for the Region struct (12 bytes)
    const region_ptr: u32 = @intCast(try api.allocate(12));
    
    // Write the Region struct: { offset, capacity, length }
    try memory.write(u32, region_ptr, 0, data_offset);
    try memory.write(u32, region_ptr, 4, @intCast(data_bytes.len));
    try memory.write(u32, region_ptr, 8, @intCast(data_bytes.len));
    
    return region_ptr;
}
WASM modules must export allocate and deallocate functions for memory management.

Lifecycle Management

Spawning Execution

From src/host/execute.zig:
pub fn spawn(allocator: Allocator, bytes: []const u8, context_ptr: usize) !void {
    var store = Store.init(allocator);
    defer store.deinit();
    
    // Inject host functions with context
    try host.initHostFunctions(&store, context_ptr);
    
    var module = Module.init(allocator, bytes);
    defer module.deinit();
    try module.decode();
    
    var instance = Instance.init(allocator, &store, module);
    defer instance.deinit();
    try instance.instantiate();
    
    // Invoke main entry point
    var input = [0]u64{};
    var output = [1]u64{0};
    
    try instance.invoke("call", &input, &output, .{
        .frame_stack_size = 4096,
        .label_stack_size = 4096,
        .operand_stack_size = 16384,
    });
}

Context Passing

Host functions receive a context pointer to access the AppContext:
pub fn initHostFunctions(store: *zware.Store, context_ptr: usize) !void {
    try store.exposeHostFunction("env", "u_query_live", u_query_live, context_ptr, ...);
    // ...
}

pub fn u_query_live(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    // Access server, tree, connections, etc.
}

Use Cases

Real-time Alerting

// WASM function polls live query and sends alerts
loop {
    event = u_poll_handle(query_id);
    if (event.value > threshold) {
        u_writeback_http(POST, alert_url, json_payload);
    }
}

Anomaly Detection

// Compare live data to historical baseline
avg_30d = u_query_history("AVG:cpu.usage:[host=x]:[30d,now]");
loop {
    event = u_poll_handle(query_id);
    if (event.value > avg_30d * 3.0) {
        u_writeback_ws(producer_id, "Anomaly detected");
    }
}

Stream Aggregation

// Aggregate and downsample
loop {
    event = u_poll_handle(query_id);
    buffer.append(event);
    if (buffer.len >= 1000) {
        avg = buffer.average();
        u_write_event(tags, avg, now());
        buffer.clear();
    }
}

Performance Considerations

  • Interpretation overhead: zware is an interpreter, not a JIT compiler
  • Memory isolation: Separate linear memory space
  • Host call cost: Context switching between WASM and native code
  • Async yielding: zio.yield() when polling empty queues
  • Stack limits: Configured for typical workloads, can be adjusted

Build docs developers (and LLMs) love