Query DSL
Slung provides a compact query language for filtering and aggregating time-series data:OP:SERIES:[TAGS]:[RANGE]
src/query.zig.
Components
Operation (OP) - Optional aggregation:AVG- Average of valuesMIN- Minimum valueMAX- Maximum valueSUM- Sum of valuesCOUNT- Count of points
AND- Both conditions must matchOR- Either condition can matchNOT- Negation operator (unary)
- Absolute timestamps:
[1700000000,1700000100] - Relative time:
[1m,now],[2h,now],[5d,now] - Supported units: s/sec/second, m/min/minute, h/hr/hour, d/day, w/wk/week
Examples
AVG:cpu.total:[region=us-west AND env=prod]:[1h,now]
MIN:temperature:[sensor=outdoor]:[24h,now]
MAX:memory.used:[host=server-1 OR host=server-2]:[1700000000,1700000100]
SUM:requests:[path=/api AND NOT status=200]
COUNT:errors:[level=critical]:[5m,now]
Query Structure
pub const Query = struct {
const MAX_SERIES = 16;
const MAX_TAG_TOKENS = 32;
pub const TagOp = enum {
and_op,
or_op,
not_op,
};
pub const TagToken = union(enum) {
tag: []const u8,
op: TagOp,
};
op: PollState,
series: []const u8,
tags: [MAX_TAG_TOKENS]TagToken,
tags_len: usize,
has_time_range: bool,
time_start: i64,
time_end: i64,
};
Parsing
Query Initialization
pub fn init(filter: []const u8) !Self {
var query = Self{
.op = parseOp(filter),
.series = "",
.tags = undefined,
.tags_len = 0,
.has_time_range = false,
.time_start = 0,
.time_end = 0,
};
var iter_parts = std.mem.splitScalar(u8, filter, ':');
_ = iter_parts.next() orelse return error.InvalidQuery;
query.series = iter_parts.next() orelse return error.InvalidQuery;
const tags_part = iter_parts.next() orelse return error.InvalidQuery;
query.tags_len = try parseTags(tags_part, query.tags[0..]);
if (iter_parts.next()) |range_part| {
const range = try parseRange(range_part);
query.has_time_range = true;
query.time_start = range.start;
query.time_end = range.end;
}
return query;
}
Operation Parsing
fn parseOp(filter: []const u8) PollState {
if (std.mem.startsWith(u8, filter, "AVG")) {
return .{ .Avg = .{ .sum = 0, .count = 0 } };
} else if (std.mem.startsWith(u8, filter, "MIN")) {
return .{ .Min = std.math.inf(f64) };
} else if (std.mem.startsWith(u8, filter, "MAX")) {
return .{ .Max = -std.math.inf(f64) };
} else if (std.mem.startsWith(u8, filter, "SUM")) {
return .{ .Sum = 0 };
} else if (std.mem.startsWith(u8, filter, "COUNT")) {
return .{ .Count = 0 };
} else {
return .{ .Sum = 0 };
}
}
Tag Parsing
Parses infix boolean expressions:fn parseTags(tags: []const u8, out: []TagToken) error{ InvalidTags, TooManyTags }!usize {
var inner = std.mem.trim(u8, tags, " \t\r\n");
if (inner.len < 2 or inner[0] != '[' or inner[inner.len - 1] != ']') {
return error.InvalidTags;
}
inner = std.mem.trim(u8, inner[1 .. inner.len - 1], " \t\r\n");
if (inner.len == 0) return 0;
var iter_tags = std.mem.splitScalar(u8, inner, ' ');
var expect_tag = true;
var out_len: usize = 0;
while (iter_tags.next()) |raw| {
const token = std.mem.trim(u8, raw, " \t\r\n");
if (token.len == 0) continue;
if (expect_tag) {
// Handle unary NOT
if (parseTagOp(token)) |op| {
if (op != .not_op) return error.InvalidTags;
out[out_len] = .{ .op = .not_op };
out_len += 1;
continue;
}
out[out_len] = .{ .tag = token };
out_len += 1;
expect_tag = false;
} else {
const op = parseTagOp(token) orelse return error.InvalidTags;
out[out_len] = .{ .op = op };
out_len += 1;
expect_tag = true;
}
}
if (expect_tag) return error.InvalidTags;
return out_len;
}
Time Range Parsing
Supports relative and absolute times:fn parseTimeSpec(raw: []const u8, now_us: i64) error{ InvalidRange, InvalidCharacter, Overflow }!i64 {
const spec = std.mem.trim(u8, raw, " \t\r\n");
if (spec.len == 0) return error.InvalidRange;
// Handle "now" keyword
if (std.ascii.eqlIgnoreCase(spec, "now")) {
return now_us;
}
// Handle absolute timestamp
if (std.ascii.isDigit(spec[spec.len - 1])) {
return std.fmt.parseInt(i64, spec, 10) catch |err| switch (err) {
error.InvalidCharacter => error.InvalidRange,
error.Overflow => error.Overflow,
};
}
// Parse relative time (e.g. "5m", "2h", "1d")
var amount_end: usize = 0;
while (amount_end < spec.len and std.ascii.isDigit(spec[amount_end])) : (amount_end += 1) {}
const amount = try std.fmt.parseInt(i64, spec[0..amount_end], 10);
const unit_raw = std.mem.trim(u8, spec[amount_end..], " \t\r\n");
const unit_seconds: i64 = if (std.ascii.eqlIgnoreCase(unit_raw, "s") or
std.ascii.eqlIgnoreCase(unit_raw, "sec") or
std.ascii.eqlIgnoreCase(unit_raw, "second"))
1
else if (std.ascii.eqlIgnoreCase(unit_raw, "m") or
std.ascii.eqlIgnoreCase(unit_raw, "min") or
std.ascii.eqlIgnoreCase(unit_raw, "minute"))
60
else if (std.ascii.eqlIgnoreCase(unit_raw, "h") or
std.ascii.eqlIgnoreCase(unit_raw, "hr") or
std.ascii.eqlIgnoreCase(unit_raw, "hour"))
60 * 60
else if (std.ascii.eqlIgnoreCase(unit_raw, "d") or
std.ascii.eqlIgnoreCase(unit_raw, "day"))
60 * 60 * 24
else if (std.ascii.eqlIgnoreCase(unit_raw, "w") or
std.ascii.eqlIgnoreCase(unit_raw, "wk") or
std.ascii.eqlIgnoreCase(unit_raw, "week"))
60 * 60 * 24 * 7
else
return error.InvalidRange;
const duration_seconds = try std.math.mul(i64, amount, unit_seconds);
const duration_us = try std.math.mul(i64, duration_seconds, 1_000_000);
return std.math.sub(i64, now_us, duration_us) catch return error.Overflow;
}
Tag Filtering
Tag matching evaluates boolean expressions:pub fn matchesTags(self: *const Self, tags: []const []const u8) bool {
if (self.tags_len == 0) return true;
var idx: usize = 0;
var current = self.consumeOperand(tags, &idx) orelse return false;
while (idx < self.tags_len) {
const token = self.tags[idx];
idx += 1;
const op = switch (token) {
.op => |v| v,
.tag => return false,
};
var rhs_negated = false;
var combine_op = op;
if (op == .not_op) {
combine_op = .and_op;
rhs_negated = true;
}
const rhs = self.consumeOperandWithNegation(tags, &idx, rhs_negated) orelse return false;
current = switch (combine_op) {
.and_op => current and rhs,
.or_op => current or rhs,
.not_op => unreachable,
};
}
return current;
}
Query: [env=prod AND NOT host=test]
Tags: ["env=prod", "host=server-1", "region=us"]
Step 1: env=prod → true (found in tags)
Step 2: AND operator
Step 3: NOT host=test → true (NOT found in tags)
Step 4: true AND true → true (match)
Query Execution
Queries execute against the TSM tree:Historical Queries
From WASM host function (src/host/host.zig):
pub fn u_query_history(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
const filter_ptr = vm.popOperand(u32);
const memory = try vm.inst.getMemory(0);
const filter_bytes = try readRegion(memory, filter_ptr);
const query = Query.init(filter_bytes) catch {
try vm.pushOperand(f64, 0);
return;
};
// Map query operation to TSM operation
var op: QueryOp = undefined;
switch (query.op) {
.Avg => op = QueryOp.AVG,
.Sum => op = QueryOp.SUM,
.Max => op = QueryOp.MAX,
.Min => op = QueryOp.MIN,
.Count => op = QueryOp.COUNT,
.None => {
try vm.pushOperand(f64, 0);
return;
},
}
// Find matching series
const series_keys = context.server.matchingSeriesKeysForQuery(context.io.allocator, &query) catch {
try vm.pushOperand(f64, 0);
return;
};
defer context.io.allocator.free(series_keys);
// Execute query
const start = if (query.has_time_range) query.time_start else std.math.minInt(i64);
const end = if (query.has_time_range) query.time_end else std.math.maxInt(i64);
const value = aggregateHistory(context, series_keys, start, end, op) catch {
try vm.pushOperand(f64, 0);
return;
};
try vm.pushOperand(f64, value.Float);
}
Multi-Series Aggregation
fn aggregateHistory(context: *AppContext, series_keys: []const []const u8, start: i64, end: i64, op: QueryOp) !Value {
var count: u64 = 0;
var sum: f64 = 0;
var min = std.math.inf(f64);
var max = -std.math.inf(f64);
for (series_keys) |series_key| {
const values = context.server.tree.queryRaw(series_key, start, end) catch continue;
defer context.io.allocator.free(values);
for (values) |value| {
const f = switch (value) {
.Float => |v| v,
.Int => |v| @as(f64, @floatFromInt(v)),
.Bool => |v| @as(f64, @floatFromInt(@intFromBool(v))),
.Bytes => 0,
};
sum += f;
if (f < min) min = f;
if (f > max) max = f;
count += 1;
}
}
return switch (op) {
.AVG => Value{ .Float = if (count == 0) 0 else sum / @as(f64, @floatFromInt(count)) },
.SUM => Value{ .Float = sum },
.COUNT => Value{ .Float = @as(f64, @floatFromInt(count)) },
.MIN => Value{ .Float = if (count == 0) 0 else min },
.MAX => Value{ .Float = if (count == 0) 0 else max },
};
}
Live Queries
Live queries track state and update on incoming events:pub fn u_query_live(vm: *zware.VirtualMachine, context_ptr: usize) zware.WasmError!void {
const context: *AppContext = @ptrFromInt(context_ptr);
const filter_ptr = vm.popOperand(u32);
const memory = try vm.inst.getMemory(0);
const filter_bytes = try readRegion(memory, filter_ptr);
const query_id = context.server.next_query_id.fetchAdd(1, .monotonic);
const query = Query.init(filter_bytes) catch {
try vm.pushOperand(u64, 0);
return;
};
// Register query for live updates
context.server.queries.put(query_id, query) catch {
try vm.pushOperand(u64, 0);
return;
};
try vm.pushOperand(u64, query_id);
}
// In u_write_event
var iter_queries = context.server.queries.iterator();
while (iter_queries.next()) |entry| {
const query_id = entry.key_ptr.*;
const q = entry.value_ptr;
// Check if event matches query
if (!std.mem.eql(u8, q.series, parsed.series)) continue;
if (!q.matchesTags(parsed.tags)) continue;
if (q.has_time_range and (timestamp < q.time_start or timestamp > q.time_end)) continue;
// Update aggregation state
switch (q.op) {
.Avg => {
q.*.op.Avg.count += 1;
q.*.op.Avg.sum += value;
},
.Sum => q.*.op.Sum += value,
.Count => q.*.op.Count += 1,
.Max => if (value > q.*.op.Max) q.*.op.Max = value,
.Min => if (value < q.*.op.Min) q.*.op.Min = value,
.None => {},
}
// Queue event for polling
context.server.query_events.put(query_id, .{
.timestamp = timestamp,
.value = value,
.producer = 0,
}) catch {};
}
Performance
Query performance from billion-point benchmark:- 1M point aggregation: ~160ms average
- Series filtering: O(1) Bloom filter check + O(log n) index lookup
- Range scan: Sequential read from skip list (cache) or decompressed pages (disk)
- Multi-series aggregation: Parallel iteration over series