Overview
Slung executes user-defined functions in WebAssembly for real-time stream processing. The WASM runtime provides isolation, portability, and safe execution of custom logic over time-series data.Runtime Architecture
WASM Engine
Slung uses zware, a Zig-based WebAssembly interpreter:pub fn spawn(allocator: Allocator, bytes: []const u8, context_ptr: usize) !void {
var store = Store.init(allocator);
defer store.deinit();
try host.initHostFunctions(&store, context_ptr);
var module = Module.init(allocator, bytes);
defer module.deinit();
try module.decode();
var instance = Instance.init(allocator, &store, module);
defer instance.deinit();
try instance.instantiate();
var input = [0]u64{};
var output = [1]u64{0};
try instance.invoke("call", &input, &output, .{
.frame_stack_size = 4096,
.label_stack_size = 4096,
.operand_stack_size = 16384,
});
}
- Frame stack: 4096 entries
- Label stack: 4096 entries
- Operand stack: 16384 entries
Execution Flow
- Load WASM module bytes
- Initialize store with host functions
- Decode WASM module
- Instantiate module
- Invoke
call()entry point - Cleanup and deinit
Host Functions
Slung exposes 9 host functions to WASM modules (src/host/host.zig):
Query Functions
u_query_live - Register live querytry store.exposeHostFunction(
"env",
"u_query_live",
u_query_live,
context_ptr,
&[_]zware.ValType{.I32}, // filter_ptr
&[_]zware.ValType{.I64} // returns query_id
);
pub fn u_query_live(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
const filter_ptr = vm.popOperand(u32);
const memory = try vm.inst.getMemory(0);
const filter_bytes = try readRegion(memory, filter_ptr);
const query_id = context.server.next_query_id.fetchAdd(1, .monotonic);
const query = Query.init(filter_bytes) catch {
try vm.pushOperand(u64, 0);
return;
};
context.server.queries.put(query_id, query) catch {
try vm.pushOperand(u64, 0);
return;
};
try vm.pushOperand(u64, query_id);
}
try store.exposeHostFunction(
"env",
"u_query_history",
u_query_history,
context_ptr,
&[_]zware.ValType{.I32}, // filter_ptr
&[_]zware.ValType{.F64} // returns aggregated value
);
pub fn u_query_history(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
const filter_ptr = vm.popOperand(u32);
const memory = try vm.inst.getMemory(0);
const filter_bytes = try readRegion(memory, filter_ptr);
const query = Query.init(filter_bytes) catch {
try vm.pushOperand(f64, 0);
return;
};
// Map operation and execute
var op: QueryOp = /* map from query.op */;
const series_keys = context.server.matchingSeriesKeysForQuery(context.io.allocator, &query) catch {
try vm.pushOperand(f64, 0);
return;
};
defer context.io.allocator.free(series_keys);
const start = if (query.has_time_range) query.time_start else std.math.minInt(i64);
const end = if (query.has_time_range) query.time_end else std.math.maxInt(i64);
const value = aggregateHistory(context, series_keys, start, end, op) catch {
try vm.pushOperand(f64, 0);
return;
};
try vm.pushOperand(f64, value.Float);
}
try store.exposeHostFunction(
"env",
"u_poll_handle",
u_poll_handle,
context_ptr,
&[_]zware.ValType{.I64}, // query_id
&[_]zware.ValType{.I32} // returns event_ptr or 0
);
pub fn u_poll_handle(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
const handle_u64 = vm.popOperand(u64);
const handle = std.math.cast(u32, handle_u64) orelse {
try vm.pushOperand(u32, 0);
return;
};
context.server.connections_mutex.lock();
const event = context.server.query_events.fetchRemove(handle);
context.server.connections_mutex.unlock();
if (event == null) {
zio.yield() catch {};
try vm.pushOperand(u32, 0);
return;
}
const json_bytes = pollEventJson(context.io.allocator, event.?.value) catch {
try vm.pushOperand(u32, 0);
return;
};
defer context.io.allocator.free(json_bytes);
const memory = try vm.inst.getMemory(0);
var api = Api.init(vm.inst);
const region_ptr = allocateAndWriteRegion(&api, memory, json_bytes) catch {
try vm.pushOperand(u32, 0);
return;
};
try vm.pushOperand(u32, region_ptr);
}
{"timestamp":1234567890,"value":42.5,"tags":[],"producers":[1]}
try store.exposeHostFunction(
"env",
"u_free_handle",
u_free_handle,
context_ptr,
&[_]zware.ValType{.I64}, // query_id
&[_]zware.ValType{.I32} // returns 0 on success
);
Write Functions
u_write_event - Write data pointtry store.exposeHostFunction(
"env",
"u_write_event",
u_write_event,
context_ptr,
&[_]zware.ValType{ .I32, .I32, .I32 }, // tags_ptr, value_ptr, timestamp_ptr
&[_]zware.ValType{.I32} // returns 0 on success
);
pub fn u_write_event(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
const tags_ptr = vm.popOperand(u32);
const value_ptr = vm.popOperand(u32);
const timestamp_ptr = vm.popOperand(u32);
const memory = try vm.inst.getMemory(0);
const timestamp_bytes = readRegion(memory, timestamp_ptr) catch {
try vm.pushOperand(u32, 1);
return;
};
const value_bytes = readRegion(memory, value_ptr) catch {
try vm.pushOperand(u32, 1);
return;
};
const tags_bytes = readRegion(memory, tags_ptr) catch {
try vm.pushOperand(u32, 1);
return;
};
const timestamp = std.fmt.parseInt(i64, std.mem.trim(u8, timestamp_bytes, " \t\r\n"), 10) catch {
try vm.pushOperand(u32, 1);
return;
};
const value = std.fmt.parseFloat(f64, std.mem.trim(u8, value_bytes, " \t\r\n")) catch {
try vm.pushOperand(u32, 1);
return;
};
var tags = std.ArrayList([]const u8).empty;
defer tags.deinit(context.io.allocator);
const parsed = parseWriteEventTags(tags_bytes, context.io.allocator, &tags) catch {
try vm.pushOperand(u32, 1);
return;
};
var key_scratch = std.ArrayList(u8).empty;
defer key_scratch.deinit(context.io.allocator);
const series_key = context.server.resolveSeriesKey(&key_scratch, parsed.series, parsed.tags) catch {
try vm.pushOperand(u32, 1);
return;
};
// Insert into TSM tree
context.server.tree.insert(series_key, .{
.timestamp = timestamp,
.value = .{ .Float = value },
}) catch {
try vm.pushOperand(u32, 1);
return;
};
// Update live queries
context.server.connections_mutex.lock();
defer context.server.connections_mutex.unlock();
var iter_queries = context.server.queries.iterator();
while (iter_queries.next()) |entry| {
const query_id = entry.key_ptr.*;
const q = entry.value_ptr;
if (!std.mem.eql(u8, q.series, parsed.series)) continue;
if (!q.matchesTags(parsed.tags)) continue;
if (q.has_time_range and (timestamp < q.time_start or timestamp > q.time_end)) continue;
// Update aggregation state
switch (q.op) {
.Avg => {
q.*.op.Avg.count += 1;
q.*.op.Avg.sum += value;
},
.Sum => q.*.op.Sum += value,
.Count => q.*.op.Count += 1,
.Max => if (value > q.*.op.Max) q.*.op.Max = value,
.Min => if (value < q.*.op.Min) q.*.op.Min = value,
.None => {},
}
context.server.query_events.put(query_id, .{
.timestamp = timestamp,
.value = value,
.producer = 0,
}) catch {};
}
try vm.pushOperand(u32, 0);
}
Write-back Functions
u_writeback_http - HTTP callbacktry store.exposeHostFunction(
"env",
"u_writeback_http",
u_writeback_http,
context_ptr,
&[_]zware.ValType{ .I32, .I32, .I32 }, // method, data_ptr, url_ptr
&[_]zware.ValType{.I32} // returns response_ptr
);
pub fn u_writeback_http(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
var client = http.Client.init(context.io.allocator, .{});
defer client.deinit();
const method = vm.popOperand(u32);
const data_ptr = vm.popOperand(u32);
const url_ptr = vm.popOperand(u32);
const memory = try vm.inst.getMemory(0);
const url_bytes = try readRegion(memory, url_ptr);
const data_bytes = try readRegion(memory, data_ptr);
switch (method) {
0 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .get),
1 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .post),
2 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .put),
3 => try sendHttp(vm, memory, &client, url_bytes, data_bytes, .delete),
else => {
try vm.pushOperand(u32, 0);
return;
},
}
}
- 0 = GET
- 1 = POST
- 2 = PUT
- 3 = DELETE
try store.exposeHostFunction(
"env",
"u_writeback_ws",
u_writeback_ws,
context_ptr,
&[_]zware.ValType{ .I64, .I32 }, // producer_id, data_ptr
&[_]zware.ValType{.I32} // returns 0 on success
);
pub fn u_writeback_ws(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
const data_ptr = vm.popOperand(u32);
const producer = vm.popOperand(u64);
const memory = try vm.inst.getMemory(0);
const data_bytes = try readRegion(memory, data_ptr);
if (context.server.connections.get(producer)) |connection| {
connection.send(.text, data_bytes) catch {
context.server.connections_mutex.lock();
defer context.server.connections_mutex.unlock();
_ = context.server.connections.remove(producer);
try vm.pushOperand(u32, 1);
return;
};
} else {
try vm.pushOperand(u32, 1);
return;
}
try vm.pushOperand(u32, 0);
}
WASI Functions
Standard WASI preview1 functions:try store.exposeHostFunction("wasi_snapshot_preview1", "fd_write", zware.wasi.fd_write, 0, &[_]zware.ValType{ .I32, .I32, .I32, .I32 }, &[_]zware.ValType{.I32});
try store.exposeHostFunction("wasi_snapshot_preview1", "environ_get", zware.wasi.environ_get, 0, &[_]zware.ValType{ .I32, .I32 }, &[_]zware.ValType{.I32});
try store.exposeHostFunction("wasi_snapshot_preview1", "environ_sizes_get", zware.wasi.environ_sizes_get, 0, &[_]zware.ValType{ .I32, .I32 }, &[_]zware.ValType{.I32});
try store.exposeHostFunction("wasi_snapshot_preview1", "proc_exit", zware.wasi.proc_exit, 0, &[_]zware.ValType{.I32}, &[_]zware.ValType{});
Memory Interface
Host functions use a Region-based memory interface:Region Structure
// Region struct in WASM memory: 12 bytes
struct Region {
offset: u32, // offset in WASM linear memory
capacity: u32, // allocated capacity
length: u32, // actual data length
}
Reading from WASM Memory
fn readRegion(memory: *zware.Memory, region_ptr: u32) ![]const u8 {
const data = memory.memory();
// Read Region struct: { offset: u32, capacity: u32, length: u32 }
const offset = try memory.read(u32, region_ptr, 0);
const length = try memory.read(u32, region_ptr, 8);
return data[offset .. offset + length];
}
Writing to WASM Memory
fn allocateAndWriteRegion(api: *Api, memory: *zware.Memory, data_bytes: []const u8) !u32 {
// Allocate space for data
const data_offset: u32 = @intCast(try api.allocate(@intCast(data_bytes.len)));
// Write the data to guest memory
for (data_bytes, 0..) |byte, i| {
try memory.write(u8, data_offset + @as(u32, @intCast(i)), 0, byte);
}
// Allocate space for the Region struct (12 bytes)
const region_ptr: u32 = @intCast(try api.allocate(12));
// Write the Region struct: { offset, capacity, length }
try memory.write(u32, region_ptr, 0, data_offset);
try memory.write(u32, region_ptr, 4, @intCast(data_bytes.len));
try memory.write(u32, region_ptr, 8, @intCast(data_bytes.len));
return region_ptr;
}
allocate and deallocate functions for memory management.
Lifecycle Management
Spawning Execution
Fromsrc/host/execute.zig:
pub fn spawn(allocator: Allocator, bytes: []const u8, context_ptr: usize) !void {
var store = Store.init(allocator);
defer store.deinit();
// Inject host functions with context
try host.initHostFunctions(&store, context_ptr);
var module = Module.init(allocator, bytes);
defer module.deinit();
try module.decode();
var instance = Instance.init(allocator, &store, module);
defer instance.deinit();
try instance.instantiate();
// Invoke main entry point
var input = [0]u64{};
var output = [1]u64{0};
try instance.invoke("call", &input, &output, .{
.frame_stack_size = 4096,
.label_stack_size = 4096,
.operand_stack_size = 16384,
});
}
Context Passing
Host functions receive a context pointer to access the AppContext:pub fn initHostFunctions(store: *zware.Store, context_ptr: usize) !void {
try store.exposeHostFunction("env", "u_query_live", u_query_live, context_ptr, ...);
// ...
}
pub fn u_query_live(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
// Access server, tree, connections, etc.
}
Use Cases
Real-time Alerting
// WASM function polls live query and sends alerts
loop {
event = u_poll_handle(query_id);
if (event.value > threshold) {
u_writeback_http(POST, alert_url, json_payload);
}
}
Anomaly Detection
// Compare live data to historical baseline
avg_30d = u_query_history("AVG:cpu.usage:[host=x]:[30d,now]");
loop {
event = u_poll_handle(query_id);
if (event.value > avg_30d * 3.0) {
u_writeback_ws(producer_id, "Anomaly detected");
}
}
Stream Aggregation
// Aggregate and downsample
loop {
event = u_poll_handle(query_id);
buffer.append(event);
if (buffer.len >= 1000) {
avg = buffer.average();
u_write_event(tags, avg, now());
buffer.clear();
}
}
Performance Considerations
- Interpretation overhead: zware is an interpreter, not a JIT compiler
- Memory isolation: Separate linear memory space
- Host call cost: Context switching between WASM and native code
- Async yielding:
zio.yield()when polling empty queues - Stack limits: Configured for typical workloads, can be adjusted