Documentation Index Fetch the complete documentation index at: https://mintlify.com/slunghq/slung/llms.txt
Use this file to discover all available pages before exploring further.
Live queries let you subscribe to continuous aggregates over streaming data. The runtime maintains these aggregates and pushes updates to your workflow as new events arrive.
Registering a Live Query
Use query_live() to register a subscription:
let handle = query_live ( "AVG:temp:[sensor=1]" ) ? ;
Function signature:
pub fn query_live ( filter : & str ) -> Result < QueryHandle >
Parameters:
filter: &str - Query in the format OP:SERIES:[TAGS]:[RANGE]
Returns:
Ok(QueryHandle) - A handle to poll for events (type u64)
Ok(0) - Host registration failed (0 is the sentinel value)
Host function:
unsafe extern "C" {
fn u_query_live ( filter_ptr : usize ) -> u64 ;
}
The SDK passes a pointer to a Region containing UTF-8 filter bytes. The host returns a query handle or 0 on failure.
Query Syntax
Operators
AVG - Running average
MIN - Minimum value
MAX - Maximum value
SUM - Sum of values
COUNT - Event count
Tag Filters
Use boolean operators to filter:
AVG:cpu:[host=prod AND region=us-west]
SUM:requests:[status=200 OR status=201]
MAX:latency:[NOT region=test]
Time Ranges
Optional range specification:
AVG:temp:[sensor=1]:[1h] # Last hour
SUM:bytes:[app=api]:[30min] # Last 30 minutes
COUNT:errors:[]:[1d] # Last day, no tag filter
Supported units: sec, min, hour, day, week (singular and plural forms)
You can also use microsecond timestamps:
AVG:temp:[sensor=1]:[1609459200000000,1609545600000000]
Polling for Events
Once you have a handle, poll it for events.
Continuous Polling
Use poll_handle() to poll forever:
let handle = query_live ( "AVG:temp:[sensor=1]" ) ? ;
poll_handle ( handle , on_event , 100.0 ) ? ;
Function signature:
pub fn poll_handle < F , A >( handle : QueryHandle , callback : F , args : A ) -> Result <()>
where
F : Fn ( Event , A ) -> Result <()>,
A : Clone
Parameters:
handle: QueryHandle - Handle from query_live()
callback: F - Function called for each event
args: A - Argument passed to the callback (must be Clone)
Returns:
This function does not return under normal operation. It loops forever, polling the handle and invoking your callback.
Host function:
unsafe extern "C" {
fn u_poll_handle ( handle_ptr : u64 ) -> usize ;
}
Returns a pointer to a Region containing JSON event data, or 0 if no event is available.
Example with State
use slung :: prelude ::* ;
#[main]
fn main () -> Result <()> {
let handle = query_live ( "AVG:temp:[sensor=1]" ) ? ;
poll_handle ( handle , on_event , 100.0 ) ? ;
Ok (())
}
fn on_event ( event : Event , alert_threshold : f64 ) -> Result <()> {
if event . value > alert_threshold {
println! ( "event timestamp={} value={}" , event . timestamp, event . value);
for producer in event . producers {
writeback_ws ( producer , "ALERT: threshold exceeded" ) ? ;
}
}
Ok (())
}
Single Poll
Use poll_handle_state() to poll once:
let handle = query_live ( "AVG:temp:[sensor=1]" ) ? ;
loop {
if let Some ( state ) = poll_handle_state ( handle ) ? {
match state {
PollState :: Avg ( avg ) => {
println! ( "avg={} count={}" , avg . sum / avg . count as f64 , avg . count);
}
PollState :: Sum ( val ) => println! ( "sum={}" , val ),
PollState :: Min ( val ) => println! ( "min={}" , val ),
PollState :: Max ( val ) => println! ( "max={}" , val ),
}
}
}
Function signature:
pub fn poll_handle_state ( handle : QueryHandle ) -> Result < Option < PollState >>
Returns:
Ok(Some(PollState)) - Aggregate state if available
Ok(None) - No event available yet
Event Structure
Polled events have this structure:
pub struct Event {
pub timestamp : i64 , // Unix timestamp in microseconds
pub value : f64 , // Aggregated numeric value
pub tags : Vec < String >, // Event tags
pub producers : Vec < u64 >, // Producer connection IDs
}
Field Descriptions
timestamp : Unix timestamp in microseconds when the event occurred
value : The aggregated value (e.g., running average, sum, max)
tags : String tags attached to the event
producers : WebSocket connection IDs that contributed to this aggregate. Use these for writeback.
Aggregate States
The PollState enum represents different aggregate types:
pub enum PollState {
Sum ( f64 ),
Avg ( Avg ),
Min ( f64 ),
Max ( f64 ),
}
pub struct Avg {
pub sum : f64 ,
pub count : u64 ,
}
For AVG queries, you get both sum and count to compute the average:
let avg_value = avg . sum / avg . count as f64 ;
Freeing Handles
When you’re done with a handle, free it:
Function signature:
pub fn free_handle ( handle : QueryHandle ) -> Result <()>
Host function:
unsafe extern "C" {
fn u_free_handle ( handle_ptr : u64 ) -> u32 ;
}
Returns 0 on success, non-zero on failure.
Multiple Query Example
You can register multiple queries and poll them independently:
use slung :: prelude ::* ;
#[main]
fn main () -> Result <()> {
let temp_handle = query_live ( "AVG:temp:[sensor=1]" ) ? ;
let cpu_handle = query_live ( "MAX:cpu:[host=prod]" ) ? ;
// Poll both in the same callback
std :: thread :: spawn ( move || {
poll_handle ( temp_handle , on_temp_event , ())
});
poll_handle ( cpu_handle , on_cpu_event , ()) ? ;
Ok (())
}
fn on_temp_event ( event : Event , _ : ()) -> Result <()> {
println! ( "temp: {}" , event . value);
Ok (())
}
fn on_cpu_event ( event : Event , _ : ()) -> Result <()> {
println! ( "cpu: {}" , event . value);
Ok (())
}
Best Practices
Handle errors : Always check return values. query_live() returns 0 on failure.
Free handles : Call free_handle() when done to release resources.
Keep queries specific : Use tag filters to reduce event volume.
Use appropriate aggregates : Choose the operator that matches your use case.
Clone args carefully : The args parameter must be Clone and is cloned on each event.
Next Steps
Historical Queries Query aggregated historical data
Writeback Send results to WebSocket or HTTP endpoints