Skip to main content

Core Functions

query_live()

pub fn query_live(filter: &str) -> Result<QueryHandle>
Register a live query and return a handle for polling updates.
filter
&str
required
Query filter using the syntax AGGREGATE:series:[tag1,tag2,...]

Returns

Result<QueryHandle> - Handle for polling, or Ok(0) if registration fails.

Example

let handle = query_live("AVG:temp:[sensor=1]")?;

query_history()

pub fn query_history(filter: &str) -> Result<f64>
Execute a historical aggregation query and return the aggregate value.
filter
&str
required
Query filter using the syntax AGGREGATE:series:[tag1,tag2,...]

Returns

Result<f64> - Aggregated value. Returns 0.0 for both valid zero results and some failures.

Example

let avg_temp = query_history("AVG:temp:[sensor=1]")?;
let max_cpu = query_history("MAX:cpu:[host=server1]")?;

println!("Average: {}, Max CPU: {}", avg_temp, max_cpu);

poll_handle()

pub fn poll_handle<F, A>(handle: QueryHandle, callback: F, args: A) -> Result<()>
where
    F: Fn(Event, A) -> Result<()>,
    A: Clone,
Poll a live query handle forever, invoking the callback for each received event.
handle
QueryHandle
required
Handle returned by query_live()
callback
F
required
Function to invoke for each event. Signature: fn(Event, A) -> Result<()>
args
A
required
Arguments to pass to the callback on each invocation. Must be Clone.

Returns

Result<()> - Does not return under normal operation. Only returns if the callback returns an error.

Example

let handle = query_live("SUM:requests:[]")?;

poll_handle(handle, |event, threshold| {
    if event.value > threshold {
        println!("High load: {}", event.value);
    }
    Ok(())
}, 1000.0)?;

poll_handle_state()

pub fn poll_handle_state(handle: QueryHandle) -> Result<Option<PollState>>
Poll a live query handle once and return the current aggregate state if available.
handle
QueryHandle
required
Handle returned by query_live()

Returns

Result<Option<PollState>> - Current aggregate state, or None if no update is available.

Example

let handle = query_live("AVG:temp:[sensor=1]")?;

if let Some(state) = poll_handle_state(handle)? {
    match state {
        PollState::Avg(avg) => {
            let value = avg.sum / avg.count as f64;
            println!("Average: {} (n={})", value, avg.count);
        }
        _ => {}
    }
}

free_handle()

pub fn free_handle(handle: QueryHandle) -> Result<()>
Free a live query handle on the host, releasing associated resources.
handle
QueryHandle
required
Handle to free

Returns

Result<()> - Returns error if the host fails to free the handle.

Example

let handle = query_live("AVG:temp:[]")?;
// ... use handle ...
free_handle(handle)?;

write_event()

pub fn write_event(timestamp: i64, value: f64, tags: Vec<String>) -> Result<()>
Write a new event to the host.
timestamp
i64
required
Unix timestamp in microseconds
value
f64
required
Numeric value associated with the event
tags
Vec<String>
required
Event tags encoded as CSV. The tag series=<name> is interpreted specially as the series name. All other tags are treated as regular tags.

Returns

Result<()> - Returns error if the host fails to write the event.

Example

let timestamp = unix_micros();
let value = 42.0;
let tags = vec![
    "series=derived".to_string(),
    "source=workflow".to_string(),
    "env=prod".to_string()
];

write_event(timestamp, value, tags)?;

writeback_ws()

pub fn writeback_ws(destination: u64, data: &str) -> Result<()>
Write text data back to a WebSocket producer connection.
destination
u64
required
Producer connection ID from event.producers
data
&str
required
Text data to send

Returns

Result<()> - Returns error if the writeback fails.

Example

fn on_event(event: Event, _: ()) -> Result<()> {
    if event.value > 100.0 {
        for producer in event.producers {
            writeback_ws(producer, "ALERT: threshold exceeded")?;
        }
    }
    Ok(())
}

writeback_http()

pub fn writeback_http(
    destination: &str,
    data: &str,
    method: WritebackMethod
) -> Result<Option<Vec<u8>>>
Send an HTTP request through the host.
destination
&str
required
HTTP URL for the request
data
&str
required
Request body data
method
WritebackMethod
required
HTTP method: GET, POST, PUT, or DELETE

Returns

Result<Option<Vec<u8>>> - Response body bytes, or None if no response body is returned or the request fails.

Example

use slung::WritebackMethod;

let response = writeback_http(
    "https://api.example.com/webhook",
    r#"{"value": 42.0, "alert": true}"#,
    WritebackMethod::POST
)?;

if let Some(body) = response {
    let text = String::from_utf8_lossy(&body);
    println!("Response: {}", text);
}

unix_micros()

pub fn unix_micros() -> i64
Return the current Unix epoch timestamp in microseconds.

Returns

i64 - Current time in microseconds since Unix epoch.

Example

let timestamp = unix_micros();
write_event(timestamp, 23.5, vec!["series=temp".to_string()])?;

Types

Event

pub struct Event {
    pub timestamp: i64,      // Unix timestamp in microseconds
    pub value: f64,          // Numeric value
    pub tags: Vec<String>,   // Event tags
    pub producers: Vec<u64>, // Producer connection IDs
}
Event payload returned by host polling.

QueryHandle

pub type QueryHandle = u64;
Opaque handle returned by query_live() for polling live queries.

PollState

pub enum PollState {
    Sum(f64),
    Avg(Avg),
    Min(f64),
    Max(f64),
}
Aggregate state variants returned by poll_handle_state().

Avg

pub struct Avg {
    pub sum: f64,
    pub count: u64,
}
Running average state. Calculate the average as sum / count as f64.

WritebackMethod

pub enum WritebackMethod {
    GET,
    POST,
    PUT,
    DELETE,
}
HTTP method for writeback_http() requests.

Build docs developers (and LLMs) love