Skip to main content
Live queries let you subscribe to continuous aggregates over streaming data. The runtime maintains these aggregates and pushes updates to your workflow as new events arrive.

Registering a Live Query

Use query_live() to register a subscription:
let handle = query_live("AVG:temp:[sensor=1]")?;
Function signature:
pub fn query_live(filter: &str) -> Result<QueryHandle>
Parameters:
  • filter: &str - Query in the format OP:SERIES:[TAGS]:[RANGE]
Returns:
  • Ok(QueryHandle) - A handle to poll for events (type u64)
  • Ok(0) - Host registration failed (0 is the sentinel value)
Host function:
unsafe extern "C" {
    fn u_query_live(filter_ptr: usize) -> u64;
}
The SDK passes a pointer to a Region containing UTF-8 filter bytes. The host returns a query handle or 0 on failure.

Query Syntax

OP:SERIES:[TAGS]:[RANGE]

Operators

  • AVG - Running average
  • MIN - Minimum value
  • MAX - Maximum value
  • SUM - Sum of values
  • COUNT - Event count

Tag Filters

Use boolean operators to filter:
AVG:cpu:[host=prod AND region=us-west]
SUM:requests:[status=200 OR status=201]
MAX:latency:[NOT region=test]

Time Ranges

Optional range specification:
AVG:temp:[sensor=1]:[1h]        # Last hour
SUM:bytes:[app=api]:[30min]    # Last 30 minutes
COUNT:errors:[]:[1d]            # Last day, no tag filter
Supported units: sec, min, hour, day, week (singular and plural forms) You can also use microsecond timestamps:
AVG:temp:[sensor=1]:[1609459200000000,1609545600000000]

Polling for Events

Once you have a handle, poll it for events.

Continuous Polling

Use poll_handle() to poll forever:
let handle = query_live("AVG:temp:[sensor=1]")?;
poll_handle(handle, on_event, 100.0)?;
Function signature:
pub fn poll_handle<F, A>(handle: QueryHandle, callback: F, args: A) -> Result<()>
where
    F: Fn(Event, A) -> Result<()>,
    A: Clone
Parameters:
  • handle: QueryHandle - Handle from query_live()
  • callback: F - Function called for each event
  • args: A - Argument passed to the callback (must be Clone)
Returns: This function does not return under normal operation. It loops forever, polling the handle and invoking your callback. Host function:
unsafe extern "C" {
    fn u_poll_handle(handle_ptr: u64) -> usize;
}
Returns a pointer to a Region containing JSON event data, or 0 if no event is available.

Example with State

use slung::prelude::*;

#[main]
fn main() -> Result<()> {
    let handle = query_live("AVG:temp:[sensor=1]")?;
    poll_handle(handle, on_event, 100.0)?;
    Ok(())
}

fn on_event(event: Event, alert_threshold: f64) -> Result<()> {
    if event.value > alert_threshold {
        println!("event timestamp={} value={}", event.timestamp, event.value);
        for producer in event.producers {
            writeback_ws(producer, "ALERT: threshold exceeded")?;
        }
    }
    Ok(())
}

Single Poll

Use poll_handle_state() to poll once:
let handle = query_live("AVG:temp:[sensor=1]")?;

loop {
    if let Some(state) = poll_handle_state(handle)? {
        match state {
            PollState::Avg(avg) => {
                println!("avg={} count={}", avg.sum / avg.count as f64, avg.count);
            }
            PollState::Sum(val) => println!("sum={}", val),
            PollState::Min(val) => println!("min={}", val),
            PollState::Max(val) => println!("max={}", val),
        }
    }
}
Function signature:
pub fn poll_handle_state(handle: QueryHandle) -> Result<Option<PollState>>
Returns:
  • Ok(Some(PollState)) - Aggregate state if available
  • Ok(None) - No event available yet

Event Structure

Polled events have this structure:
pub struct Event {
    pub timestamp: i64,      // Unix timestamp in microseconds
    pub value: f64,          // Aggregated numeric value
    pub tags: Vec<String>,   // Event tags
    pub producers: Vec<u64>, // Producer connection IDs
}

Field Descriptions

  • timestamp: Unix timestamp in microseconds when the event occurred
  • value: The aggregated value (e.g., running average, sum, max)
  • tags: String tags attached to the event
  • producers: WebSocket connection IDs that contributed to this aggregate. Use these for writeback.

Aggregate States

The PollState enum represents different aggregate types:
pub enum PollState {
    Sum(f64),
    Avg(Avg),
    Min(f64),
    Max(f64),
}

pub struct Avg {
    pub sum: f64,
    pub count: u64,
}
For AVG queries, you get both sum and count to compute the average:
let avg_value = avg.sum / avg.count as f64;

Freeing Handles

When you’re done with a handle, free it:
free_handle(handle)?;
Function signature:
pub fn free_handle(handle: QueryHandle) -> Result<()>
Host function:
unsafe extern "C" {
    fn u_free_handle(handle_ptr: u64) -> u32;
}
Returns 0 on success, non-zero on failure.

Multiple Query Example

You can register multiple queries and poll them independently:
use slung::prelude::*;

#[main]
fn main() -> Result<()> {
    let temp_handle = query_live("AVG:temp:[sensor=1]")?;
    let cpu_handle = query_live("MAX:cpu:[host=prod]")?;
    
    // Poll both in the same callback
    std::thread::spawn(move || {
        poll_handle(temp_handle, on_temp_event, ())
    });
    
    poll_handle(cpu_handle, on_cpu_event, ())?;
    
    Ok(())
}

fn on_temp_event(event: Event, _: ()) -> Result<()> {
    println!("temp: {}", event.value);
    Ok(())
}

fn on_cpu_event(event: Event, _: ()) -> Result<()> {
    println!("cpu: {}", event.value);
    Ok(())
}

Best Practices

  1. Handle errors: Always check return values. query_live() returns 0 on failure.
  2. Free handles: Call free_handle() when done to release resources.
  3. Keep queries specific: Use tag filters to reduce event volume.
  4. Use appropriate aggregates: Choose the operator that matches your use case.
  5. Clone args carefully: The args parameter must be Clone and is cloned on each event.

Next Steps

Historical Queries

Query aggregated historical data

Writeback

Send results to WebSocket or HTTP endpoints

Build docs developers (and LLMs) love