Skip to main content

Overview

Slung embeds a WebAssembly runtime (zware) that allows you to write custom workflows in any language that compiles to WASM. Workflows have access to a rich set of host functions for querying time series data, writing events, and communicating with external systems.

Architecture

Workflow Execution

Workflows run concurrently with the main server (src/main.zig:394-415):
pub fn handleWasm(allocator: std.mem.Allocator, context: *AppContext) !void {
    var args = try std.process.argsWithAllocator(allocator);
    defer args.deinit();
    
    _ = args.next();
    var wasm_path: ?[]const u8 = null;
    
    while (args.next()) |arg| {
        if (std.mem.eql(u8, arg, "--wasm")) {
            wasm_path = args.next() orelse return error.InvalidArguments;
            continue;
        }
        return error.InvalidArguments;
    }
    
    const path = wasm_path orelse @panic("Set the path to the Wasm file with --wasm <path>");
    const bytes = try std.fs.cwd().readFileAlloc(allocator, path, 64 * 1024 * 1024);
    defer allocator.free(bytes);
    
    const context_ptr = @intFromPtr(context);
    try context.server.notify.wait();  // Wait for server ready
    try execute.spawn(allocator, bytes, context_ptr);
}

WASM Instantiation

Workflows are instantiated with host functions exposed (src/host/execute.zig:9-31):
pub fn spawn(allocator: Allocator, bytes: []const u8, context_ptr: usize) !void {
    var store = Store.init(allocator);
    defer store.deinit();
    
    // Expose Slung host functions
    try host.initHostFunctions(&store, context_ptr);
    
    var module = Module.init(allocator, bytes);
    defer module.deinit();
    try module.decode();
    
    var instance = Instance.init(allocator, &store, module);
    defer instance.deinit();
    try instance.instantiate();
    
    // Invoke the workflow's entry point
    var input = [0]u64{};
    var output = [1]u64{0};
    try instance.invoke("call", &input, &output, .{
        .frame_stack_size = 4096,
        .label_stack_size = 4096,
        .operand_stack_size = 16384,
    });
}

Host Functions

Slung exposes 7 host functions to WASM modules (src/host/host.zig:9-24):

Query Functions

u_query_live

Register a persistent query that receives real-time updates. Signature:
(func $u_query_live (param $filter_ptr i32) (result i64))
Parameters:
  • filter_ptr: Pointer to Region containing query filter string
Returns:
  • Query handle (u64) for polling, or 0 on error
Filter Format:
OP:SERIES:[TAGS]:[RANGE]
Examples:
AVG:cpu.usage:[host=server-01 AND region=us-west]:[1m,now]
SUM:requests:[service=api]:[5m,now]
COUNT:errors:[severity=critical]
Implementation (host.zig:74-92):
pub fn u_query_live(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const filter_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    const filter_bytes = try readRegion(memory, filter_ptr);
    
    // Parse and register query
    const query_id = context.server.next_query_id.fetchAdd(1, .monotonic);
    const query = Query.init(filter_bytes) catch {
        try vm.pushOperand(u64, 0);
        return;
    };
    context.server.queries.put(query_id, query) catch {
        try vm.pushOperand(u64, 0);
        return;
    };
    
    try vm.pushOperand(u64, query_id);
}

u_poll_handle

Poll a query handle for new events. Signature:
(func $u_poll_handle (param $handle i64) (result i32))
Parameters:
  • handle: Query handle from u_query_live
Returns:
  • Pointer to Region containing JSON event, or 0 if no event
Event JSON Format:
{
  "timestamp": 1700000000,
  "value": 42.5,
  "tags": [],
  "producers": [12345]
}
u_poll_handle yields if no event is available, allowing other goroutines to run. This prevents busy-waiting.
Implementation (host.zig:26-58):
pub fn u_poll_handle(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const handle_u64 = vm.popOperand(u64);
    const handle = std.math.cast(u32, handle_u64) orelse {
        try vm.pushOperand(u32, 0);
        return;
    };
    
    context.server.connections_mutex.lock();
    const event = context.server.query_events.fetchRemove(handle);
    context.server.connections_mutex.unlock();
    
    if (event == null) {
        zio.yield() catch {};  // Yield to scheduler
        try vm.pushOperand(u32, 0);
        return;
    }
    
    // Serialize event to JSON
    const json_bytes = pollEventJson(context.io.allocator, event.?.value) catch {
        try vm.pushOperand(u32, 0);
        return;
    };
    defer context.io.allocator.free(json_bytes);
    
    const memory = try vm.inst.getMemory(0);
    var api = Api.init(vm.inst);
    const region_ptr = allocateAndWriteRegion(&api, memory, json_bytes) catch {
        try vm.pushOperand(u32, 0);
        return;
    };
    
    try vm.pushOperand(u32, region_ptr);
}

u_free_handle

Free a query handle when done. Signature:
(func $u_free_handle (param $handle i64) (result i32))
Parameters:
  • handle: Query handle to free
Returns:
  • 0 on success, 1 on error

u_query_history

Execute a one-time aggregation query over historical data. Signature:
(func $u_query_history (param $filter_ptr i32) (result f64))
Parameters:
  • filter_ptr: Pointer to Region containing query filter
Returns:
  • Aggregated value (f64), or 0 on error
Implementation (host.zig:94-142):
pub fn u_query_history(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const filter_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    const filter_bytes = try readRegion(memory, filter_ptr);
    
    const query = Query.init(filter_bytes) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    
    // Convert query op to QueryOp enum
    var op: QueryOp = switch (query.op) {
        .Avg => QueryOp.AVG,
        .Sum => QueryOp.SUM,
        .Max => QueryOp.MAX,
        .Min => QueryOp.MIN,
        .Count => QueryOp.COUNT,
        .None => {
            try vm.pushOperand(f64, 0);
            return;
        },
    };
    
    // Find matching series
    const series_keys = context.server.matchingSeriesKeysForQuery(
        context.io.allocator, 
        &query
    ) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    defer context.io.allocator.free(series_keys);
    
    // Query and aggregate
    const start = if (query.has_time_range) query.time_start else std.math.minInt(i64);
    const end = if (query.has_time_range) query.time_end else std.math.maxInt(i64);
    const value = aggregateHistory(context, series_keys, start, end, op) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    
    try vm.pushOperand(f64, value.Float);
}

Write Functions

u_write_event

Write a time series event from the workflow. Signature:
(func $u_write_event 
  (param $timestamp_ptr i32) 
  (param $value_ptr i32) 
  (param $tags_ptr i32) 
  (result i32))
Parameters:
  • timestamp_ptr: Region pointer to timestamp string (microseconds)
  • value_ptr: Region pointer to value string (float)
  • tags_ptr: Region pointer to comma-separated tags
Returns:
  • 0 on success, 1 on error
Tags Format:
series=<name>,<tag1>,<tag2>,...
Example:
series=alerts,severity=high,service=api,host=server-01
The workflow must include series=<name> in the tags string. This identifies which measurement to write to.
Implementation (host.zig:144-236):
pub fn u_write_event(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const tags_ptr = vm.popOperand(u32);
    const value_ptr = vm.popOperand(u32);
    const timestamp_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    
    // Read parameters
    const timestamp_bytes = readRegion(memory, timestamp_ptr) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    const value_bytes = readRegion(memory, value_ptr) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    const tags_bytes = readRegion(memory, tags_ptr) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    // Parse timestamp and value
    const timestamp = std.fmt.parseInt(i64, std.mem.trim(u8, timestamp_bytes, " \t\r\n"), 10) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    const value = std.fmt.parseFloat(f64, std.mem.trim(u8, value_bytes, " \t\r\n")) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    // Parse tags
    var tags = std.ArrayList([]const u8).empty;
    defer tags.deinit(context.io.allocator);
    
    const parsed = parseWriteEventTags(tags_bytes, context.io.allocator, &tags) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    // Build series key
    var key_scratch = std.ArrayList(u8).empty;
    defer key_scratch.deinit(context.io.allocator);
    const series_key = context.server.resolveSeriesKey(&key_scratch, parsed.series, parsed.tags) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    // Insert into TSM tree
    context.server.tree.insert(series_key, .{
        .timestamp = timestamp,
        .value = .{ .Float = value },
    }) catch {
        try vm.pushOperand(u32, 1);
        return;
    };
    
    // Update live queries
    context.server.connections_mutex.lock();
    defer context.server.connections_mutex.unlock();
    
    var iter_queries = context.server.queries.iterator();
    while (iter_queries.next()) |entry| {
        const query_id = entry.key_ptr.*;
        const q = entry.value_ptr;
        
        // Check if query matches this event
        if (!std.mem.eql(u8, q.series, parsed.series)) continue;
        if (!q.matchesTags(parsed.tags)) continue;
        if (q.has_time_range and (timestamp < q.time_start or timestamp > q.time_end)) continue;
        
        // Update query state
        switch (q.op) {
            .Avg => {
                q.*.op.Avg.count += 1;
                q.*.op.Avg.sum += value;
            },
            .Sum => q.*.op.Sum += value,
            .Count => q.*.op.Count += 1,
            .Max => if (value > q.*.op.Max) q.*.op.Max = value,
            .Min => if (value < q.*.op.Min) q.*.op.Min = value,
            .None => {},
        }
        
        // Queue event for polling
        context.server.query_events.put(query_id, .{
            .timestamp = timestamp,
            .value = value,
            .producer = 0,
        }) catch {};
    }
    
    try vm.pushOperand(u32, 0);
}

Communication Functions

u_writeback_http

Send an HTTP request to an external service. Signature:
(func $u_writeback_http 
  (param $url_ptr i32) 
  (param $data_ptr i32) 
  (param $method i32) 
  (result i32))
Parameters:
  • url_ptr: Region pointer to URL string
  • data_ptr: Region pointer to request body
  • method: HTTP method (0=GET, 1=POST, 2=PUT, 3=DELETE)
Returns:
  • Region pointer to response body, or 0 on error

u_writeback_ws

Send a message to a WebSocket client. Signature:
(func $u_writeback_ws 
  (param $producer i64) 
  (param $data_ptr i32) 
  (result i32))
Parameters:
  • producer: Producer ID from query event
  • data_ptr: Region pointer to message data
Returns:
  • 0 on success, 1 on error
The producer ID comes from the producers array in poll events. This allows workflows to send responses back to the client that triggered the event.

Memory Management

Region Structure

Data is passed between host and guest using Region structures:
// Region: 12 bytes
struct Region {
    offset: u32,    // Offset in WASM memory
    capacity: u32,  // Allocated capacity
    length: u32,    // Actual data length
}

Reading from Guest Memory

fn readRegion(memory: *zware.Memory, region_ptr: u32) ![]const u8 {
    const data = memory.memory();
    
    // Read Region struct at region_ptr
    const offset = try memory.read(u32, region_ptr, 0);
    const length = try memory.read(u32, region_ptr, 8);
    
    return data[offset .. offset + length];
}

Writing to Guest Memory

fn allocateAndWriteRegion(
    api: *Api, 
    memory: *zware.Memory, 
    data_bytes: []const u8
) !u32 {
    // Allocate space for data
    const data_offset: u32 = @intCast(try api.allocate(@intCast(data_bytes.len)));
    
    // Write data bytes
    for (data_bytes, 0..) |byte, i| {
        try memory.write(u8, data_offset + @as(u32, @intCast(i)), 0, byte);
    }
    
    // Allocate Region struct (12 bytes)
    const region_ptr: u32 = @intCast(try api.allocate(12));
    
    // Write Region fields
    try memory.write(u32, region_ptr, 0, data_offset);  // offset
    try memory.write(u32, region_ptr, 4, @intCast(data_bytes.len));  // capacity
    try memory.write(u32, region_ptr, 8, @intCast(data_bytes.len));  // length
    
    return region_ptr;
}

Guest API Functions

WASM modules must export allocator functions:
pub const Api = struct {
    pub fn allocate(self: *Self, param0: i32) !i32 {
        var in = [_]u64{@bitCast(@as(i64, param0))};
        var out = [_]u64{0};
        try self.instance.invoke("allocate", in[0..], out[0..], .{});
        return @bitCast(@as(u32, @truncate(out[0])));
    }
    
    pub fn deallocate(self: *Self, param0: i32) !void {
        var in = [_]u64{@bitCast(@as(i64, param0))};
        var out = [_]u64{};
        try self.instance.invoke("deallocate", in[0..], out[0..], .{});
    }
};

Example Workflow

Real-Time Alerting

This workflow monitors CPU usage and sends alerts when thresholds are exceeded:
// Pseudo-code (would compile to WASM)
#[no_mangle]
pub extern "C" fn call() {
    // Register live query
    let query = "AVG:cpu.usage:[host=server-01]:[1m,now]";
    let handle = u_query_live(query);
    
    loop {
        // Poll for events
        if let Some(event) = u_poll_handle(handle) {
            // Parse event JSON
            let value = event.value;
            
            // Check threshold
            if value > 80.0 {
                // Write alert event
                let timestamp = now_micros().to_string();
                let alert_value = value.to_string();
                let tags = format!("series=alerts,severity=high,service=cpu-monitor");
                
                u_write_event(&timestamp, &alert_value, &tags);
                
                // Send HTTP notification
                let url = "https://alerts.example.com/webhook";
                let body = format!(r#"{{"cpu":{},"host":"server-01"}}"#, value);
                u_writeback_http(url, &body, 1); // POST
            }
        }
    }
}

Data Enrichment

Enrich incoming events with external data:
#[no_mangle]
pub extern "C" fn call() {
    let query = "COUNT:requests:[service=api]:[10s,now]";
    let handle = u_query_live(query);
    
    loop {
        if let Some(event) = u_poll_handle(handle) {
            // Fetch enrichment data
            let url = format!("https://api.example.com/host/{}", event.host);
            let metadata = u_writeback_http(&url, "", 0); // GET
            
            // Parse metadata and write enriched event
            // ...
        }
    }
}

WASI Support

Slung provides minimal WASI support for compatibility:
  • fd_write: Write to stdout/stderr
  • environ_get: Get environment variables
  • environ_sizes_get: Get environment sizes
  • proc_exit: Exit process
Full WASI support is not implemented. File I/O, networking, and other WASI functions are not available. Use Slung host functions instead.

Performance Considerations

Yielding

Always yield when no events are available to avoid busy-waiting:
if (event == null) {
    zio.yield() catch {};  // Yield to other goroutines
    return 0;
}

Memory Allocation

Minimize allocations in hot paths:
  • Reuse buffers when possible
  • Use stack allocation for small data
  • Deallocate Regions after use

Query Optimization

  • Use specific tag filters to reduce series scan
  • Limit time ranges to avoid scanning all history
  • Prefer u_query_history for one-time queries
  • Free handles when done with u_free_handle

Debugging

Workflows can write debug output using WASI fd_write:
(func $log (param $msg_ptr i32) (param $msg_len i32)
  ;; Write to stdout (fd=1)
  (local $iov i32)
  (local $written i32)
  
  ;; Build iovec structure
  ;; ...
  
  (call $fd_write (i32.const 1) (local.get $iov) (i32.const 1) (local.get $written))
)

Limitations

  • Single workflow: Only one WASM module can run at a time
  • No sandboxing: Workflows have full access to the database
  • No CPU limits: Long-running computations will block other operations
  • 64MB size limit: WASM modules are capped at 64MB

Next Steps

Architecture Overview

Understand the full system architecture

Query DSL

Learn the query filter syntax

Build docs developers (and LLMs) love