Documentation Index
Fetch the complete documentation index at: https://mintlify.com/getsentry/snuba/llms.txt
Use this file to discover all available pages before exploring further.
The Query Builder API allows you to construct and execute queries programmatically without writing SnQL strings.
Building Queries
Snuba provides a programmatic query builder for constructing queries:
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.query.expressions import Column, FunctionCall, Literal
from snuba.query.conditions import binary_condition, ConditionFunctions
from snuba.datasets.entities.entity_key import EntityKey
from datetime import datetime
# Create a query
query = Query(
from_clause=Entity(EntityKey.EVENTS, None),
selected_columns=[
Column("event_id", None, "event_id"),
Column("project_id", None, "project_id"),
Column("timestamp", None, "timestamp"),
],
condition=binary_condition(
ConditionFunctions.AND,
binary_condition(
ConditionFunctions.EQ,
Column("project_id", None, "project_id"),
Literal(None, 1)
),
binary_condition(
ConditionFunctions.GTE,
Column("timestamp", None, "timestamp"),
Literal(None, datetime(2024, 1, 1))
)
),
limit=100
)
Source: snuba/query/logical.py
Query Class
The Query class represents a logical query:
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.datasets.entities.entity_key import EntityKey
query = Query(
from_clause=Entity(EntityKey.EVENTS, None),
selected_columns=[...],
condition=None,
groupby=[],
having=None,
order_by=[],
limit=None,
offset=0,
limitby=None,
totals=False,
granularity=None,
)
Query Parameters
The entity or data source to query
selected_columns
Sequence[Expression]
required
List of columns/expressions to select
groupby
Optional[Sequence[Expression]]
GROUP BY expressions
order_by
Optional[Sequence[OrderBy]]
ORDER BY clauses
OFFSET value (default: 0)
Include totals row (default: False)
Expressions
Expressions represent query components:
Column
from snuba.query.expressions import Column
# Simple column
event_id = Column("event_id", None, "event_id")
# Aliased column
count_col = Column("count", "event_count", "count")
# Column with table alias
aliased = Column("project_id", "e", "project_id")
Literal
from snuba.query.expressions import Literal
from datetime import datetime
# Integer literal
project_id = Literal(None, 1)
# String literal
level = Literal(None, "error")
# DateTime literal
start_time = Literal(None, datetime(2024, 1, 1))
# Null literal
null_val = Literal(None, None)
Function Call
from snuba.query.expressions import FunctionCall, Column, Literal
# Count function
count = FunctionCall(
"count",
"count",
tuple()
)
# Count distinct
count_distinct = FunctionCall(
"uniq",
"count_distinct_users",
(Column("user_id", None, "user_id"),)
)
# Date truncation
trunc_timestamp = FunctionCall(
"toStartOfHour",
"hour",
(Column("timestamp", None, "timestamp"),)
)
Conditions
Build WHERE clause conditions:
from snuba.query.conditions import (
binary_condition,
ConditionFunctions,
combine_and_conditions,
combine_or_conditions,
)
from snuba.query.expressions import Column, Literal
# Simple equality
eq_condition = binary_condition(
ConditionFunctions.EQ,
Column("project_id", None, "project_id"),
Literal(None, 1)
)
# Greater than or equal
gte_condition = binary_condition(
ConditionFunctions.GTE,
Column("timestamp", None, "timestamp"),
Literal(None, datetime(2024, 1, 1))
)
# IN clause
in_condition = binary_condition(
ConditionFunctions.IN,
Column("project_id", None, "project_id"),
Literal(None, [1, 2, 3])
)
# Combine conditions with AND
and_condition = combine_and_conditions([
eq_condition,
gte_condition,
in_condition
])
# Combine conditions with OR
or_condition = combine_or_conditions([
eq_condition,
gte_condition
])
Source: snuba/query/conditions.py
Available Condition Functions
Greater than or equal: >=
ConditionFunctions.NOT_IN
Not in list: NOT IN
Executing Queries
Execute queries using the run_query function:
from snuba.datasets.factory import get_dataset
from snuba.request import Request
from snuba.query.query_settings import HTTPQuerySettings
from snuba.attribution.attribution_info import AttributionInfo
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
import uuid
# Build your query
query = Query(...)
# Create attribution info
attribution_info = AttributionInfo(
tenant_ids={"organization_id": 1, "referrer": "my_service"},
referrer="my_service",
app_id="my_app",
parent_api="api"
)
# Create request
request = Request(
id=uuid.uuid4(),
original_body={},
query=query,
query_settings=HTTPQuerySettings(),
attribution_info=attribution_info
)
# Get dataset
dataset = get_dataset("events")
# Create timer
timer = Timer("query")
# Execute query
result = run_query(
dataset=dataset,
request=request,
timer=timer,
robust=False
)
# Access results
for row in result.result["data"]:
print(row)
# Access metadata
print(f"SQL: {result.extra['sql']}")
print(f"Stats: {result.extra['stats']}")
Source: snuba/web/query.py:69
Query Settings
Configure query execution:
from snuba.query.query_settings import HTTPQuerySettings
settings = HTTPQuerySettings(
turbo=False,
consistent=False,
debug=True,
dry_run=False
)
# Set custom clickhouse settings
settings.set_resource_quota(ResourceQuota(max_threads=4))
settings.push_clickhouse_setting("max_execution_time", 30)
Source: snuba/query/query_settings.py
HTTPQuerySettings Methods
Check if turbo mode is enabledReturns: bool
Check if consistent reads are enabledReturns: bool
Check if debug mode is enabledReturns: bool
Check if dry run mode is enabledReturns: bool
Set resource quota for queryParameters:
quota (ResourceQuota): Resource quota limits
push_clickhouse_setting()
Add a ClickHouse setting overrideParameters:
key (str): Setting name
value (Any): Setting value
Complete Example
Here’s a complete example building and executing a query:
from snuba.datasets.factory import get_dataset
from snuba.datasets.entities.entity_key import EntityKey
from snuba.query.logical import Query
from snuba.query.data_source.simple import Entity
from snuba.query.expressions import Column, FunctionCall, Literal
from snuba.query.conditions import binary_condition, combine_and_conditions, ConditionFunctions
from snuba.request import Request
from snuba.query.query_settings import HTTPQuerySettings
from snuba.attribution.attribution_info import AttributionInfo
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
from datetime import datetime, timedelta
import uuid
# Build query: Count errors by project in last hour
query = Query(
from_clause=Entity(EntityKey.EVENTS, None),
selected_columns=[
Column("project_id", None, "project_id"),
FunctionCall(
"count",
"error_count",
tuple()
)
],
condition=combine_and_conditions([
binary_condition(
ConditionFunctions.EQ,
Column("level", None, "level"),
Literal(None, "error")
),
binary_condition(
ConditionFunctions.GTE,
Column("timestamp", None, "timestamp"),
Literal(None, datetime.now() - timedelta(hours=1))
),
binary_condition(
ConditionFunctions.LT,
Column("timestamp", None, "timestamp"),
Literal(None, datetime.now())
)
]),
groupby=[
Column("project_id", None, "project_id")
],
order_by=[
OrderBy(
expression=FunctionCall("count", "error_count", tuple()),
direction=OrderByDirection.DESC
)
],
limit=10
)
# Create request
attribution_info = AttributionInfo(
tenant_ids={"organization_id": 1, "referrer": "error_monitor"},
referrer="error_monitor",
app_id="monitoring",
parent_api="api"
)
request = Request(
id=uuid.uuid4(),
original_body={},
query=query,
query_settings=HTTPQuerySettings(debug=True),
attribution_info=attribution_info
)
# Execute
dataset = get_dataset("events")
timer = Timer("error_count_query")
try:
result = run_query(dataset, request, timer)
print("Top 10 projects by error count:")
for row in result.result["data"]:
print(f" Project {row['project_id']}: {row['error_count']} errors")
print(f"\nQuery took {timer.for_json()['duration_ms']}ms")
print(f"SQL: {result.extra['sql']}")
except Exception as e:
print(f"Query failed: {e}")
SnQL Parsing
You can also parse SnQL strings into Query objects:
from snuba.request.validation import parse_snql_query
snql = """
MATCH (events)
SELECT event_id, project_id, timestamp
WHERE project_id = 1
AND timestamp >= toDateTime('2024-01-01T00:00:00')
AND timestamp < toDateTime('2024-01-02T00:00:00')
LIMIT 100
"""
query = parse_snql_query(snql, get_dataset("events"))
print(query)
Source: snuba/request/validation.py:24
Datasets
Work with datasets and entities
Processors
Query processing pipeline