Skip to main content

Query DSL

Slung provides a compact query language for filtering and aggregating time-series data:
OP:SERIES:[TAGS]:[RANGE]
Implemented in src/query.zig.

Components

Operation (OP) - Optional aggregation:
  • AVG - Average of values
  • MIN - Minimum value
  • MAX - Maximum value
  • SUM - Sum of values
  • COUNT - Count of points
Series - Metric name to query Tags - Filter expression in brackets:
  • AND - Both conditions must match
  • OR - Either condition can match
  • NOT - Negation operator (unary)
Range - Time range in brackets (optional):
  • Absolute timestamps: [1700000000,1700000100]
  • Relative time: [1m,now], [2h,now], [5d,now]
  • Supported units: s/sec/second, m/min/minute, h/hr/hour, d/day, w/wk/week

Examples

AVG:cpu.total:[region=us-west AND env=prod]:[1h,now]
MIN:temperature:[sensor=outdoor]:[24h,now]
MAX:memory.used:[host=server-1 OR host=server-2]:[1700000000,1700000100]
SUM:requests:[path=/api AND NOT status=200]
COUNT:errors:[level=critical]:[5m,now]

Query Structure

pub const Query = struct {
    const MAX_SERIES = 16;
    const MAX_TAG_TOKENS = 32;
    
    pub const TagOp = enum {
        and_op,
        or_op,
        not_op,
    };
    
    pub const TagToken = union(enum) {
        tag: []const u8,
        op: TagOp,
    };
    
    op: PollState,
    series: []const u8,
    tags: [MAX_TAG_TOKENS]TagToken,
    tags_len: usize,
    has_time_range: bool,
    time_start: i64,
    time_end: i64,
};

Parsing

Query Initialization

pub fn init(filter: []const u8) !Self {
    var query = Self{
        .op = parseOp(filter),
        .series = "",
        .tags = undefined,
        .tags_len = 0,
        .has_time_range = false,
        .time_start = 0,
        .time_end = 0,
    };
    
    var iter_parts = std.mem.splitScalar(u8, filter, ':');
    _ = iter_parts.next() orelse return error.InvalidQuery;
    
    query.series = iter_parts.next() orelse return error.InvalidQuery;
    
    const tags_part = iter_parts.next() orelse return error.InvalidQuery;
    query.tags_len = try parseTags(tags_part, query.tags[0..]);
    
    if (iter_parts.next()) |range_part| {
        const range = try parseRange(range_part);
        query.has_time_range = true;
        query.time_start = range.start;
        query.time_end = range.end;
    }
    
    return query;
}

Operation Parsing

fn parseOp(filter: []const u8) PollState {
    if (std.mem.startsWith(u8, filter, "AVG")) {
        return .{ .Avg = .{ .sum = 0, .count = 0 } };
    } else if (std.mem.startsWith(u8, filter, "MIN")) {
        return .{ .Min = std.math.inf(f64) };
    } else if (std.mem.startsWith(u8, filter, "MAX")) {
        return .{ .Max = -std.math.inf(f64) };
    } else if (std.mem.startsWith(u8, filter, "SUM")) {
        return .{ .Sum = 0 };
    } else if (std.mem.startsWith(u8, filter, "COUNT")) {
        return .{ .Count = 0 };
    } else {
        return .{ .Sum = 0 };
    }
}

Tag Parsing

Parses infix boolean expressions:
fn parseTags(tags: []const u8, out: []TagToken) error{ InvalidTags, TooManyTags }!usize {
    var inner = std.mem.trim(u8, tags, " \t\r\n");
    if (inner.len < 2 or inner[0] != '[' or inner[inner.len - 1] != ']') {
        return error.InvalidTags;
    }
    inner = std.mem.trim(u8, inner[1 .. inner.len - 1], " \t\r\n");
    if (inner.len == 0) return 0;
    
    var iter_tags = std.mem.splitScalar(u8, inner, ' ');
    var expect_tag = true;
    var out_len: usize = 0;
    
    while (iter_tags.next()) |raw| {
        const token = std.mem.trim(u8, raw, " \t\r\n");
        if (token.len == 0) continue;
        
        if (expect_tag) {
            // Handle unary NOT
            if (parseTagOp(token)) |op| {
                if (op != .not_op) return error.InvalidTags;
                out[out_len] = .{ .op = .not_op };
                out_len += 1;
                continue;
            }
            
            out[out_len] = .{ .tag = token };
            out_len += 1;
            expect_tag = false;
        } else {
            const op = parseTagOp(token) orelse return error.InvalidTags;
            out[out_len] = .{ .op = op };
            out_len += 1;
            expect_tag = true;
        }
    }
    
    if (expect_tag) return error.InvalidTags;
    return out_len;
}

Time Range Parsing

Supports relative and absolute times:
fn parseTimeSpec(raw: []const u8, now_us: i64) error{ InvalidRange, InvalidCharacter, Overflow }!i64 {
    const spec = std.mem.trim(u8, raw, " \t\r\n");
    if (spec.len == 0) return error.InvalidRange;
    
    // Handle "now" keyword
    if (std.ascii.eqlIgnoreCase(spec, "now")) {
        return now_us;
    }
    
    // Handle absolute timestamp
    if (std.ascii.isDigit(spec[spec.len - 1])) {
        return std.fmt.parseInt(i64, spec, 10) catch |err| switch (err) {
            error.InvalidCharacter => error.InvalidRange,
            error.Overflow => error.Overflow,
        };
    }
    
    // Parse relative time (e.g. "5m", "2h", "1d")
    var amount_end: usize = 0;
    while (amount_end < spec.len and std.ascii.isDigit(spec[amount_end])) : (amount_end += 1) {}
    
    const amount = try std.fmt.parseInt(i64, spec[0..amount_end], 10);
    const unit_raw = std.mem.trim(u8, spec[amount_end..], " \t\r\n");
    
    const unit_seconds: i64 = if (std.ascii.eqlIgnoreCase(unit_raw, "s") or
        std.ascii.eqlIgnoreCase(unit_raw, "sec") or
        std.ascii.eqlIgnoreCase(unit_raw, "second"))
        1
    else if (std.ascii.eqlIgnoreCase(unit_raw, "m") or
        std.ascii.eqlIgnoreCase(unit_raw, "min") or
        std.ascii.eqlIgnoreCase(unit_raw, "minute"))
        60
    else if (std.ascii.eqlIgnoreCase(unit_raw, "h") or
        std.ascii.eqlIgnoreCase(unit_raw, "hr") or
        std.ascii.eqlIgnoreCase(unit_raw, "hour"))
        60 * 60
    else if (std.ascii.eqlIgnoreCase(unit_raw, "d") or
        std.ascii.eqlIgnoreCase(unit_raw, "day"))
        60 * 60 * 24
    else if (std.ascii.eqlIgnoreCase(unit_raw, "w") or
        std.ascii.eqlIgnoreCase(unit_raw, "wk") or
        std.ascii.eqlIgnoreCase(unit_raw, "week"))
        60 * 60 * 24 * 7
    else
        return error.InvalidRange;
    
    const duration_seconds = try std.math.mul(i64, amount, unit_seconds);
    const duration_us = try std.math.mul(i64, duration_seconds, 1_000_000);
    return std.math.sub(i64, now_us, duration_us) catch return error.Overflow;
}

Tag Filtering

Tag matching evaluates boolean expressions:
pub fn matchesTags(self: *const Self, tags: []const []const u8) bool {
    if (self.tags_len == 0) return true;
    
    var idx: usize = 0;
    var current = self.consumeOperand(tags, &idx) orelse return false;
    
    while (idx < self.tags_len) {
        const token = self.tags[idx];
        idx += 1;
        const op = switch (token) {
            .op => |v| v,
            .tag => return false,
        };
        
        var rhs_negated = false;
        var combine_op = op;
        if (op == .not_op) {
            combine_op = .and_op;
            rhs_negated = true;
        }
        
        const rhs = self.consumeOperandWithNegation(tags, &idx, rhs_negated) orelse return false;
        current = switch (combine_op) {
            .and_op => current and rhs,
            .or_op => current or rhs,
            .not_op => unreachable,
        };
    }
    
    return current;
}
Example evaluation:
Query: [env=prod AND NOT host=test]
Tags: ["env=prod", "host=server-1", "region=us"]

Step 1: env=prod → true (found in tags)
Step 2: AND operator
Step 3: NOT host=test → true (NOT found in tags)
Step 4: true AND true → true (match)

Query Execution

Queries execute against the TSM tree:

Historical Queries

From WASM host function (src/host/host.zig):
pub fn u_query_history(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const filter_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    const filter_bytes = try readRegion(memory, filter_ptr);
    
    const query = Query.init(filter_bytes) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    
    // Map query operation to TSM operation
    var op: QueryOp = undefined;
    switch (query.op) {
        .Avg => op = QueryOp.AVG,
        .Sum => op = QueryOp.SUM,
        .Max => op = QueryOp.MAX,
        .Min => op = QueryOp.MIN,
        .Count => op = QueryOp.COUNT,
        .None => {
            try vm.pushOperand(f64, 0);
            return;
        },
    }
    
    // Find matching series
    const series_keys = context.server.matchingSeriesKeysForQuery(context.io.allocator, &query) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    defer context.io.allocator.free(series_keys);
    
    // Execute query
    const start = if (query.has_time_range) query.time_start else std.math.minInt(i64);
    const end = if (query.has_time_range) query.time_end else std.math.maxInt(i64);
    const value = aggregateHistory(context, series_keys, start, end, op) catch {
        try vm.pushOperand(f64, 0);
        return;
    };
    
    try vm.pushOperand(f64, value.Float);
}

Multi-Series Aggregation

fn aggregateHistory(context: *AppContext, series_keys: []const []const u8, start: i64, end: i64, op: QueryOp) !Value {
    var count: u64 = 0;
    var sum: f64 = 0;
    var min = std.math.inf(f64);
    var max = -std.math.inf(f64);
    
    for (series_keys) |series_key| {
        const values = context.server.tree.queryRaw(series_key, start, end) catch continue;
        defer context.io.allocator.free(values);
        
        for (values) |value| {
            const f = switch (value) {
                .Float => |v| v,
                .Int => |v| @as(f64, @floatFromInt(v)),
                .Bool => |v| @as(f64, @floatFromInt(@intFromBool(v))),
                .Bytes => 0,
            };
            sum += f;
            if (f < min) min = f;
            if (f > max) max = f;
            count += 1;
        }
    }
    
    return switch (op) {
        .AVG => Value{ .Float = if (count == 0) 0 else sum / @as(f64, @floatFromInt(count)) },
        .SUM => Value{ .Float = sum },
        .COUNT => Value{ .Float = @as(f64, @floatFromInt(count)) },
        .MIN => Value{ .Float = if (count == 0) 0 else min },
        .MAX => Value{ .Float = if (count == 0) 0 else max },
    };
}

Live Queries

Live queries track state and update on incoming events:
pub fn u_query_live(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
    const context: *AppContext = @ptrFromInt(context_ptr);
    const filter_ptr = vm.popOperand(u32);
    
    const memory = try vm.inst.getMemory(0);
    const filter_bytes = try readRegion(memory, filter_ptr);
    
    const query_id = context.server.next_query_id.fetchAdd(1, .monotonic);
    const query = Query.init(filter_bytes) catch {
        try vm.pushOperand(u64, 0);
        return;
    };
    
    // Register query for live updates
    context.server.queries.put(query_id, query) catch {
        try vm.pushOperand(u64, 0);
        return;
    };
    
    try vm.pushOperand(u64, query_id);
}
When events arrive, they update matching queries:
// In u_write_event
var iter_queries = context.server.queries.iterator();
while (iter_queries.next()) |entry| {
    const query_id = entry.key_ptr.*;
    const q = entry.value_ptr;
    
    // Check if event matches query
    if (!std.mem.eql(u8, q.series, parsed.series)) continue;
    if (!q.matchesTags(parsed.tags)) continue;
    if (q.has_time_range and (timestamp < q.time_start or timestamp > q.time_end)) continue;
    
    // Update aggregation state
    switch (q.op) {
        .Avg => {
            q.*.op.Avg.count += 1;
            q.*.op.Avg.sum += value;
        },
        .Sum => q.*.op.Sum += value,
        .Count => q.*.op.Count += 1,
        .Max => if (value > q.*.op.Max) q.*.op.Max = value,
        .Min => if (value < q.*.op.Min) q.*.op.Min = value,
        .None => {},
    }
    
    // Queue event for polling
    context.server.query_events.put(query_id, .{
        .timestamp = timestamp,
        .value = value,
        .producer = 0,
    }) catch {};
}

Performance

Query performance from billion-point benchmark:
  • 1M point aggregation: ~160ms average
  • Series filtering: O(1) Bloom filter check + O(log n) index lookup
  • Range scan: Sequential read from skip list (cache) or decompressed pages (disk)
  • Multi-series aggregation: Parallel iteration over series

Build docs developers (and LLMs) love