Overview
Slung embeds a WebAssembly runtime (zware ) that allows you to write custom workflows in any language that compiles to WASM. Workflows have access to a rich set of host functions for querying time series data, writing events, and communicating with external systems.
Architecture
Workflow Execution
Workflows run concurrently with the main server (src/main.zig:394-415):
pub fn handleWasm ( allocator : std . mem . Allocator , context : * AppContext ) ! void {
var args = try std . process . argsWithAllocator ( allocator );
defer args . deinit ();
_ = args . next ();
var wasm_path : ? [] const u8 = null ;
while ( args . next ()) | arg | {
if ( std . mem . eql ( u8 , arg , "--wasm" )) {
wasm_path = args . next () orelse return error . InvalidArguments ;
continue ;
}
return error . InvalidArguments ;
}
const path = wasm_path orelse @panic ( "Set the path to the Wasm file with --wasm <path>" );
const bytes = try std . fs . cwd (). readFileAlloc ( allocator , path , 64 * 1024 * 1024 );
defer allocator . free ( bytes );
const context_ptr = @intFromPtr ( context );
try context . server . notify . wait (); // Wait for server ready
try execute . spawn ( allocator , bytes , context_ptr );
}
WASM Instantiation
Workflows are instantiated with host functions exposed (src/host/execute.zig:9-31):
pub fn spawn ( allocator : Allocator , bytes : [] const u8 , context_ptr : usize ) ! void {
var store = Store . init ( allocator );
defer store . deinit ();
// Expose Slung host functions
try host . initHostFunctions ( & store , context_ptr );
var module = Module . init ( allocator , bytes );
defer module . deinit ();
try module . decode ();
var instance = Instance . init ( allocator , & store , module );
defer instance . deinit ();
try instance . instantiate ();
// Invoke the workflow's entry point
var input = [ 0 ] u64 {};
var output = [ 1 ] u64 { 0 };
try instance . invoke ( "call" , & input , & output , .{
. frame_stack_size = 4096 ,
. label_stack_size = 4096 ,
. operand_stack_size = 16384 ,
});
}
Host Functions
Slung exposes 7 host functions to WASM modules (src/host/host.zig:9-24):
Query Functions
u_query_live
Register a persistent query that receives real-time updates.
Signature:
(func $u_query_live (param $filter_ptr i32) (result i64))
Parameters:
filter_ptr: Pointer to Region containing query filter string
Returns:
Query handle (u64) for polling, or 0 on error
Filter Format:
Examples:
AVG:cpu.usage:[host=server-01 AND region=us-west]:[1m,now]
SUM:requests:[service=api]:[5m,now]
COUNT:errors:[severity=critical]
Implementation (host.zig:74-92):
pub fn u_query_live ( vm : * zware . VirtualMachine , context_ptr : usize ) zware . WasmError ! void {
const context : * AppContext = @ptrFromInt ( context_ptr );
const filter_ptr = vm . popOperand ( u32 );
const memory = try vm . inst . getMemory ( 0 );
const filter_bytes = try readRegion ( memory , filter_ptr );
// Parse and register query
const query_id = context . server . next_query_id . fetchAdd ( 1 , . monotonic );
const query = Query . init ( filter_bytes ) catch {
try vm . pushOperand ( u64 , 0 );
return ;
};
context . server . queries . put ( query_id , query ) catch {
try vm . pushOperand ( u64 , 0 );
return ;
};
try vm . pushOperand ( u64 , query_id );
}
u_poll_handle
Poll a query handle for new events.
Signature:
(func $u_poll_handle (param $handle i64) (result i32))
Parameters:
handle: Query handle from u_query_live
Returns:
Pointer to Region containing JSON event, or 0 if no event
Event JSON Format:
{
"timestamp" : 1700000000 ,
"value" : 42.5 ,
"tags" : [],
"producers" : [ 12345 ]
}
u_poll_handle yields if no event is available, allowing other goroutines to run. This prevents busy-waiting.
Implementation (host.zig:26-58):
pub fn u_poll_handle ( vm : * zware . VirtualMachine , context_ptr : usize ) zware . WasmError ! void {
const context : * AppContext = @ptrFromInt ( context_ptr );
const handle_u64 = vm . popOperand ( u64 );
const handle = std . math . cast ( u32 , handle_u64 ) orelse {
try vm . pushOperand ( u32 , 0 );
return ;
};
context . server . connections_mutex . lock ();
const event = context . server . query_events . fetchRemove ( handle );
context . server . connections_mutex . unlock ();
if ( event == null ) {
zio . yield () catch {}; // Yield to scheduler
try vm . pushOperand ( u32 , 0 );
return ;
}
// Serialize event to JSON
const json_bytes = pollEventJson ( context . io . allocator , event . ? . value ) catch {
try vm . pushOperand ( u32 , 0 );
return ;
};
defer context . io . allocator . free ( json_bytes );
const memory = try vm . inst . getMemory ( 0 );
var api = Api . init ( vm . inst );
const region_ptr = allocateAndWriteRegion ( & api , memory , json_bytes ) catch {
try vm . pushOperand ( u32 , 0 );
return ;
};
try vm . pushOperand ( u32 , region_ptr );
}
u_free_handle
Free a query handle when done.
Signature:
(func $u_free_handle (param $handle i64) (result i32))
Parameters:
handle: Query handle to free
Returns:
u_query_history
Execute a one-time aggregation query over historical data.
Signature:
(func $u_query_history (param $filter_ptr i32) (result f64))
Parameters:
filter_ptr: Pointer to Region containing query filter
Returns:
Aggregated value (f64), or 0 on error
Implementation (host.zig:94-142):
pub fn u_query_history ( vm : * zware . VirtualMachine , context_ptr : usize ) zware . WasmError ! void {
const context : * AppContext = @ptrFromInt ( context_ptr );
const filter_ptr = vm . popOperand ( u32 );
const memory = try vm . inst . getMemory ( 0 );
const filter_bytes = try readRegion ( memory , filter_ptr );
const query = Query . init ( filter_bytes ) catch {
try vm . pushOperand ( f64 , 0 );
return ;
};
// Convert query op to QueryOp enum
var op : QueryOp = switch ( query . op ) {
. Avg => QueryOp . AVG ,
. Sum => QueryOp . SUM ,
. Max => QueryOp . MAX ,
. Min => QueryOp . MIN ,
. Count => QueryOp . COUNT ,
. None => {
try vm . pushOperand ( f64 , 0 );
return ;
},
};
// Find matching series
const series_keys = context . server . matchingSeriesKeysForQuery (
context . io . allocator ,
& query
) catch {
try vm . pushOperand ( f64 , 0 );
return ;
};
defer context . io . allocator . free ( series_keys );
// Query and aggregate
const start = if ( query . has_time_range ) query . time_start else std . math . minInt ( i64 );
const end = if ( query . has_time_range ) query . time_end else std . math . maxInt ( i64 );
const value = aggregateHistory ( context , series_keys , start , end , op ) catch {
try vm . pushOperand ( f64 , 0 );
return ;
};
try vm . pushOperand ( f64 , value . Float );
}
Write Functions
u_write_event
Write a time series event from the workflow.
Signature:
(func $u_write_event
(param $timestamp_ptr i32)
(param $value_ptr i32)
(param $tags_ptr i32)
(result i32))
Parameters:
timestamp_ptr: Region pointer to timestamp string (microseconds)
value_ptr: Region pointer to value string (float)
tags_ptr: Region pointer to comma-separated tags
Returns:
Tags Format:
series=<name>,<tag1>,<tag2>,...
Example:
series=alerts,severity=high,service=api,host=server-01
The workflow must include series=<name> in the tags string. This identifies which measurement to write to.
Implementation (host.zig:144-236):
pub fn u_write_event ( vm : * zware . VirtualMachine , context_ptr : usize ) zware . WasmError ! void {
const context : * AppContext = @ptrFromInt ( context_ptr );
const tags_ptr = vm . popOperand ( u32 );
const value_ptr = vm . popOperand ( u32 );
const timestamp_ptr = vm . popOperand ( u32 );
const memory = try vm . inst . getMemory ( 0 );
// Read parameters
const timestamp_bytes = readRegion ( memory , timestamp_ptr ) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
const value_bytes = readRegion ( memory , value_ptr ) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
const tags_bytes = readRegion ( memory , tags_ptr ) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
// Parse timestamp and value
const timestamp = std . fmt . parseInt ( i64 , std . mem . trim ( u8 , timestamp_bytes , " \t\r\n " ), 10 ) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
const value = std . fmt . parseFloat ( f64 , std . mem . trim ( u8 , value_bytes , " \t\r\n " )) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
// Parse tags
var tags = std . ArrayList ([] const u8 ). empty ;
defer tags . deinit ( context . io . allocator );
const parsed = parseWriteEventTags ( tags_bytes , context . io . allocator , & tags ) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
// Build series key
var key_scratch = std . ArrayList ( u8 ). empty ;
defer key_scratch . deinit ( context . io . allocator );
const series_key = context . server . resolveSeriesKey ( & key_scratch , parsed . series , parsed . tags ) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
// Insert into TSM tree
context . server . tree . insert ( series_key , .{
. timestamp = timestamp ,
. value = .{ . Float = value },
}) catch {
try vm . pushOperand ( u32 , 1 );
return ;
};
// Update live queries
context . server . connections_mutex . lock ();
defer context . server . connections_mutex . unlock ();
var iter_queries = context . server . queries . iterator ();
while ( iter_queries . next ()) | entry | {
const query_id = entry . key_ptr . * ;
const q = entry . value_ptr ;
// Check if query matches this event
if ( ! std . mem . eql ( u8 , q . series , parsed . series )) continue ;
if ( ! q . matchesTags ( parsed . tags )) continue ;
if ( q . has_time_range and ( timestamp < q . time_start or timestamp > q . time_end )) continue ;
// Update query state
switch ( q . op ) {
. Avg => {
q . * . op . Avg . count += 1 ;
q . * . op . Avg . sum += value ;
},
. Sum => q . * . op . Sum += value ,
. Count => q . * . op . Count += 1 ,
. Max => if ( value > q . * . op . Max ) q . * . op . Max = value ,
. Min => if ( value < q . * . op . Min ) q . * . op . Min = value ,
. None => {},
}
// Queue event for polling
context . server . query_events . put ( query_id , .{
. timestamp = timestamp ,
. value = value ,
. producer = 0 ,
}) catch {};
}
try vm . pushOperand ( u32 , 0 );
}
Communication Functions
u_writeback_http
Send an HTTP request to an external service.
Signature:
(func $u_writeback_http
(param $url_ptr i32)
(param $data_ptr i32)
(param $method i32)
(result i32))
Parameters:
url_ptr: Region pointer to URL string
data_ptr: Region pointer to request body
method: HTTP method (0=GET, 1=POST, 2=PUT, 3=DELETE)
Returns:
Region pointer to response body, or 0 on error
u_writeback_ws
Send a message to a WebSocket client.
Signature:
(func $u_writeback_ws
(param $producer i64)
(param $data_ptr i32)
(result i32))
Parameters:
producer: Producer ID from query event
data_ptr: Region pointer to message data
Returns:
The producer ID comes from the producers array in poll events. This allows workflows to send responses back to the client that triggered the event.
Memory Management
Region Structure
Data is passed between host and guest using Region structures:
// Region: 12 bytes
struct Region {
offset : u32 , // Offset in WASM memory
capacity : u32 , // Allocated capacity
length : u32 , // Actual data length
}
Reading from Guest Memory
fn readRegion ( memory : * zware . Memory , region_ptr : u32 ) ! [] const u8 {
const data = memory . memory ();
// Read Region struct at region_ptr
const offset = try memory . read ( u32 , region_ptr , 0 );
const length = try memory . read ( u32 , region_ptr , 8 );
return data [ offset .. offset + length ];
}
Writing to Guest Memory
fn allocateAndWriteRegion (
api : * Api ,
memory : * zware . Memory ,
data_bytes : [] const u8
) ! u32 {
// Allocate space for data
const data_offset : u32 = @intCast ( try api . allocate ( @intCast ( data_bytes . len )));
// Write data bytes
for ( data_bytes , 0 ..) | byte , i | {
try memory . write ( u8 , data_offset + @as ( u32 , @intCast ( i )), 0 , byte );
}
// Allocate Region struct (12 bytes)
const region_ptr : u32 = @intCast ( try api . allocate ( 12 ));
// Write Region fields
try memory . write ( u32 , region_ptr , 0 , data_offset ); // offset
try memory . write ( u32 , region_ptr , 4 , @intCast ( data_bytes . len )); // capacity
try memory . write ( u32 , region_ptr , 8 , @intCast ( data_bytes . len )); // length
return region_ptr ;
}
Guest API Functions
WASM modules must export allocator functions:
pub const Api = struct {
pub fn allocate ( self : * Self , param0 : i32 ) ! i32 {
var in = [ _ ] u64 { @bitCast ( @as ( i64 , param0 ))};
var out = [ _ ] u64 { 0 };
try self . instance . invoke ( "allocate" , in [ 0 ..], out [ 0 ..], .{});
return @bitCast ( @as ( u32 , @truncate ( out [ 0 ])));
}
pub fn deallocate ( self : * Self , param0 : i32 ) ! void {
var in = [ _ ] u64 { @bitCast ( @as ( i64 , param0 ))};
var out = [ _ ] u64 {};
try self . instance . invoke ( "deallocate" , in [ 0 ..], out [ 0 ..], .{});
}
};
Example Workflow
Real-Time Alerting
This workflow monitors CPU usage and sends alerts when thresholds are exceeded:
// Pseudo-code (would compile to WASM)
#[no_mangle]
pub extern "C" fn call () {
// Register live query
let query = "AVG:cpu.usage:[host=server-01]:[1m,now]" ;
let handle = u_query_live ( query );
loop {
// Poll for events
if let Some ( event ) = u_poll_handle ( handle ) {
// Parse event JSON
let value = event . value;
// Check threshold
if value > 80.0 {
// Write alert event
let timestamp = now_micros () . to_string ();
let alert_value = value . to_string ();
let tags = format! ( "series=alerts,severity=high,service=cpu-monitor" );
u_write_event ( & timestamp , & alert_value , & tags );
// Send HTTP notification
let url = "https://alerts.example.com/webhook" ;
let body = format! ( r#"{{"cpu":{},"host":"server-01"}}"# , value );
u_writeback_http ( url , & body , 1 ); // POST
}
}
}
}
Data Enrichment
Enrich incoming events with external data:
#[no_mangle]
pub extern "C" fn call () {
let query = "COUNT:requests:[service=api]:[10s,now]" ;
let handle = u_query_live ( query );
loop {
if let Some ( event ) = u_poll_handle ( handle ) {
// Fetch enrichment data
let url = format! ( "https://api.example.com/host/{}" , event . host);
let metadata = u_writeback_http ( & url , "" , 0 ); // GET
// Parse metadata and write enriched event
// ...
}
}
}
WASI Support
Slung provides minimal WASI support for compatibility:
fd_write: Write to stdout/stderr
environ_get: Get environment variables
environ_sizes_get: Get environment sizes
proc_exit: Exit process
Full WASI support is not implemented. File I/O, networking, and other WASI functions are not available. Use Slung host functions instead.
Yielding
Always yield when no events are available to avoid busy-waiting:
if ( event == null ) {
zio . yield () catch {}; // Yield to other goroutines
return 0 ;
}
Memory Allocation
Minimize allocations in hot paths:
Reuse buffers when possible
Use stack allocation for small data
Deallocate Regions after use
Query Optimization
Use specific tag filters to reduce series scan
Limit time ranges to avoid scanning all history
Prefer u_query_history for one-time queries
Free handles when done with u_free_handle
Debugging
Workflows can write debug output using WASI fd_write:
(func $log (param $msg_ptr i32) (param $msg_len i32)
;; Write to stdout (fd=1)
(local $iov i32)
(local $written i32)
;; Build iovec structure
;; ...
(call $fd_write (i32.const 1) (local.get $iov) (i32.const 1) (local.get $written))
)
Limitations
Single workflow : Only one WASM module can run at a time
No sandboxing : Workflows have full access to the database
No CPU limits : Long-running computations will block other operations
64MB size limit : WASM modules are capped at 64MB
Next Steps
Architecture Overview Understand the full system architecture
Query DSL Learn the query filter syntax