Core Functions
query_live()
pub fn query_live(filter: &str) -> Result<QueryHandle>
Register a live query and return a handle for polling updates.
Query filter using the syntax AGGREGATE:series:[tag1,tag2,...]
Returns
Result<QueryHandle> - Handle for polling, or Ok(0) if registration fails.
Example
let handle = query_live("AVG:temp:[sensor=1]")?;
query_history()
pub fn query_history(filter: &str) -> Result<f64>
Execute a historical aggregation query and return the aggregate value.
Query filter using the syntax AGGREGATE:series:[tag1,tag2,...]
Returns
Result<f64> - Aggregated value. Returns 0.0 for both valid zero results and some failures.
Example
let avg_temp = query_history("AVG:temp:[sensor=1]")?;
let max_cpu = query_history("MAX:cpu:[host=server1]")?;
println!("Average: {}, Max CPU: {}", avg_temp, max_cpu);
poll_handle()
pub fn poll_handle<F, A>(handle: QueryHandle, callback: F, args: A) -> Result<()>
where
F: Fn(Event, A) -> Result<()>,
A: Clone,
Poll a live query handle forever, invoking the callback for each received event.
Handle returned by query_live()
Function to invoke for each event. Signature: fn(Event, A) -> Result<()>
Arguments to pass to the callback on each invocation. Must be Clone.
Returns
Result<()> - Does not return under normal operation. Only returns if the callback returns an error.
Example
let handle = query_live("SUM:requests:[]")?;
poll_handle(handle, |event, threshold| {
if event.value > threshold {
println!("High load: {}", event.value);
}
Ok(())
}, 1000.0)?;
poll_handle_state()
pub fn poll_handle_state(handle: QueryHandle) -> Result<Option<PollState>>
Poll a live query handle once and return the current aggregate state if available.
Handle returned by query_live()
Returns
Result<Option<PollState>> - Current aggregate state, or None if no update is available.
Example
let handle = query_live("AVG:temp:[sensor=1]")?;
if let Some(state) = poll_handle_state(handle)? {
match state {
PollState::Avg(avg) => {
let value = avg.sum / avg.count as f64;
println!("Average: {} (n={})", value, avg.count);
}
_ => {}
}
}
free_handle()
pub fn free_handle(handle: QueryHandle) -> Result<()>
Free a live query handle on the host, releasing associated resources.
Returns
Result<()> - Returns error if the host fails to free the handle.
Example
let handle = query_live("AVG:temp:[]")?;
// ... use handle ...
free_handle(handle)?;
write_event()
pub fn write_event(timestamp: i64, value: f64, tags: Vec<String>) -> Result<()>
Write a new event to the host.
Unix timestamp in microseconds
Numeric value associated with the event
Event tags encoded as CSV. The tag series=<name> is interpreted specially as the series name. All other tags are treated as regular tags.
Returns
Result<()> - Returns error if the host fails to write the event.
Example
let timestamp = unix_micros();
let value = 42.0;
let tags = vec![
"series=derived".to_string(),
"source=workflow".to_string(),
"env=prod".to_string()
];
write_event(timestamp, value, tags)?;
writeback_ws()
pub fn writeback_ws(destination: u64, data: &str) -> Result<()>
Write text data back to a WebSocket producer connection.
Producer connection ID from event.producers
Returns
Result<()> - Returns error if the writeback fails.
Example
fn on_event(event: Event, _: ()) -> Result<()> {
if event.value > 100.0 {
for producer in event.producers {
writeback_ws(producer, "ALERT: threshold exceeded")?;
}
}
Ok(())
}
writeback_http()
pub fn writeback_http(
destination: &str,
data: &str,
method: WritebackMethod
) -> Result<Option<Vec<u8>>>
Send an HTTP request through the host.
HTTP method: GET, POST, PUT, or DELETE
Returns
Result<Option<Vec<u8>>> - Response body bytes, or None if no response body is returned or the request fails.
Example
use slung::WritebackMethod;
let response = writeback_http(
"https://api.example.com/webhook",
r#"{"value": 42.0, "alert": true}"#,
WritebackMethod::POST
)?;
if let Some(body) = response {
let text = String::from_utf8_lossy(&body);
println!("Response: {}", text);
}
unix_micros()
pub fn unix_micros() -> i64
Return the current Unix epoch timestamp in microseconds.
Returns
i64 - Current time in microseconds since Unix epoch.
Example
let timestamp = unix_micros();
write_event(timestamp, 23.5, vec!["series=temp".to_string()])?;
Types
Event
pub struct Event {
pub timestamp: i64, // Unix timestamp in microseconds
pub value: f64, // Numeric value
pub tags: Vec<String>, // Event tags
pub producers: Vec<u64>, // Producer connection IDs
}
Event payload returned by host polling.
QueryHandle
pub type QueryHandle = u64;
Opaque handle returned by query_live() for polling live queries.
PollState
pub enum PollState {
Sum(f64),
Avg(Avg),
Min(f64),
Max(f64),
}
Aggregate state variants returned by poll_handle_state().
Avg
pub struct Avg {
pub sum: f64,
pub count: u64,
}
Running average state. Calculate the average as sum / count as f64.
WritebackMethod
pub enum WritebackMethod {
GET,
POST,
PUT,
DELETE,
}
HTTP method for writeback_http() requests.