Documentation Index
Fetch the complete documentation index at: https://mintlify.com/getsentry/snuba/llms.txt
Use this file to discover all available pages before exploring further.
Query processors transform and validate queries as they flow through the execution pipeline.
Overview
Snuba’s query pipeline consists of multiple stages, each with its own processors:
Processor Interface
All processors implement a common interface:
from abc import ABC, abstractmethod
from snuba.query import ProcessableQuery
class QueryProcessor(ABC):
@abstractmethod
def process_query(
self,
query: ProcessableQuery,
query_settings: QuerySettings
) -> None:
"""
Process and transform the query in-place.
Args:
query: The query to process
query_settings: Query execution settings
"""
raise NotImplementedError
Entity Processing Stage
Entity processors run after query parsing, before storage selection.
from snuba.pipeline.stages.query_processing import EntityProcessingStage
from snuba.pipeline.query_pipeline import QueryPipelineResult
from snuba.request import Request
from snuba.utils.metrics.timer import Timer
# Execute entity processing stage
stage = EntityProcessingStage()
result = stage.execute(
QueryPipelineResult(
data=request,
query_settings=request.query_settings,
timer=timer,
error=None
)
)
Source: snuba/pipeline/stages/query_processing.py:14
Common Entity Processors
Validates and processes time range queries:
- Ensures required time columns are present
- Validates time range is reasonable
- Adds time-based optimizations
from snuba.datasets.processors.timeseries_processor import TimeSeriesProcessor
processor = TimeSeriesProcessor()
processor.process_query(query, query_settings)
Enforces project-level rate limits:from snuba.datasets.processors.project_rate_limiter import ProjectRateLimiter
processor = ProjectRateLimiter()
processor.process_query(query, query_settings)
Optimizes queries using ClickHouse PREWHERE:
- Moves selective conditions to PREWHERE
- Improves query performance
from snuba.query.processors.prewhere import PrewhereProcessor
processor = PrewhereProcessor()
processor.process_query(query, query_settings)
Storage Processing Stage
Storage processors run after storage selection, before query execution.
from snuba.pipeline.stages.query_processing import StorageProcessingStage
# Execute storage processing stage
stage = StorageProcessingStage()
result = stage.execute(clickhouse_query)
Source: snuba/pipeline/stages/query_processing.py:16
Common Storage Processors
Optimizes column mapping and projection:from snuba.datasets.processors.mapping_optimizer import MappingOptimizer
processor = MappingOptimizer(column_mapping)
processor.process_query(query, query_settings)
Enforces table-level rate limits:from snuba.datasets.processors.table_rate_limiter import TableRateLimiter
processor = TableRateLimiter()
processor.process_query(query, query_settings)
Enforces consistency settings:from snuba.datasets.processors.consistency import ConsistencyEnforcer
processor = ConsistencyEnforcer()
processor.process_query(query, query_settings)
Creating Custom Processors
You can create custom processors for specific use cases:
from snuba.query.processors import QueryProcessor
from snuba.query import ProcessableQuery
from snuba.query.query_settings import QuerySettings
from snuba.query.expressions import Column, Literal
from snuba.query.conditions import binary_condition, ConditionFunctions
class CustomProjectFilter(QueryProcessor):
"""
Automatically adds project_id filter to all queries.
"""
def __init__(self, allowed_projects: list[int]):
self.allowed_projects = allowed_projects
def process_query(
self,
query: ProcessableQuery,
query_settings: QuerySettings
) -> None:
# Add project filter condition
project_condition = binary_condition(
ConditionFunctions.IN,
Column("project_id", None, "project_id"),
Literal(None, self.allowed_projects)
)
# Combine with existing condition
existing = query.get_condition()
if existing:
from snuba.query.conditions import combine_and_conditions
new_condition = combine_and_conditions([existing, project_condition])
else:
new_condition = project_condition
query.set_ast_condition(new_condition)
# Usage
processor = CustomProjectFilter(allowed_projects=[1, 2, 3])
processor.process_query(query, query_settings)
Processor Examples
Add Default Conditions
from snuba.query.processors import QueryProcessor
from snuba.query.conditions import binary_condition, ConditionFunctions
from snuba.query.expressions import Column, Literal
class DefaultConditionsProcessor(QueryProcessor):
def process_query(self, query, query_settings):
# Add default deleted = 0 condition
deleted_condition = binary_condition(
ConditionFunctions.EQ,
Column("deleted", None, "deleted"),
Literal(None, 0)
)
existing = query.get_condition()
if existing:
from snuba.query.conditions import BooleanFunctions
new_condition = binary_condition(
BooleanFunctions.AND,
existing,
deleted_condition
)
else:
new_condition = deleted_condition
query.set_ast_condition(new_condition)
from snuba.query.processors import QueryProcessor
from snuba.query.expressions import Column
class ColumnRenameProcessor(QueryProcessor):
def __init__(self, mapping: dict[str, str]):
self.mapping = mapping
def process_query(self, query, query_settings):
# Walk through all expressions and rename columns
def rename_visitor(expr):
if isinstance(expr, Column):
old_name = expr.column_name
if old_name in self.mapping:
return Column(
self.mapping[old_name],
expr.table_name,
self.mapping[old_name]
)
return expr
# Apply to selected columns
query.set_selected_columns([
rename_visitor(col) for col in query.get_selected_columns()
])
# Apply to conditions
condition = query.get_condition()
if condition:
query.set_ast_condition(rename_visitor(condition))
# Usage
processor = ColumnRenameProcessor({
"old_column": "new_column",
"deprecated_field": "current_field"
})
from snuba.query.processors import QueryProcessor
class MetadataProcessor(QueryProcessor):
def process_query(self, query, query_settings):
# Add custom metadata to query
query.set_experiment("custom_processor", True)
query.set_experiment("processor_version", "1.0")
# Log processing
import logging
logger = logging.getLogger(__name__)
logger.info(f"Processing query {query}")
Subscription Processors
Subscription-specific processors handle recurring queries:
from snuba.datasets.entity_subscriptions.processors import SubscriptionProcessor
class CustomSubscriptionProcessor(SubscriptionProcessor):
def process(
self,
query: ProcessableQuery,
metadata: Mapping[str, Any],
offset: Optional[int]
) -> None:
"""
Process subscription query.
Args:
query: The subscription query
metadata: Subscription metadata
offset: Kafka offset for deduplication
"""
# Add subscription-specific logic
pass
def to_dict(self, metadata: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Serialize processor state to dict.
"""
return {"custom_data": metadata.get("custom_field")}
Source: Entity implements get_subscription_processors()
Query Validation
Validators check query constraints:
from snuba.datasets.entity_subscriptions.validators import (
EntitySubscriptionValidator,
InvalidSubscriptionError
)
class CustomValidator(EntitySubscriptionValidator):
def validate(
self,
query: ProcessableQuery,
metadata: Mapping[str, Any]
) -> None:
# Validate query constraints
selected = query.get_selected_columns()
if len(selected) > 10:
raise InvalidSubscriptionError(
"Too many selected columns (max 10)"
)
# Validate metadata
if "required_field" not in metadata:
raise InvalidSubscriptionError(
"Missing required_field in metadata"
)
Processor Registration
Processors are registered with entities and storages:
from snuba.datasets.pluggable_entity import PluggableEntity
entity = PluggableEntity(
entity_key=EntityKey.CUSTOM,
storages=[storage],
query_processors=[
CustomProjectFilter([1, 2, 3]),
DefaultConditionsProcessor(),
],
subscription_processors=[
CustomSubscriptionProcessor()
],
subscription_validators=[
CustomValidator()
]
)
Pipeline Execution
The full pipeline execution:
from snuba.pipeline.query_pipeline import QueryPipelineResult
from snuba.pipeline.stages.query_processing import (
EntityProcessingStage,
StorageProcessingStage
)
from snuba.pipeline.stages.query_execution import ExecutionStage
# Stage 1: Entity processing
entity_result = EntityProcessingStage().execute(
QueryPipelineResult(
data=request,
query_settings=request.query_settings,
timer=timer,
error=None
)
)
# Stage 2: Storage processing
storage_result = StorageProcessingStage().execute(entity_result)
# Stage 3: Execution
final_result = ExecutionStage(
request.attribution_info,
query_metadata=query_metadata,
robust=False
).execute(storage_result)
Source: snuba/web/query.py:36
Debugging Processors
Enable debug mode to see processor effects:
from snuba.query.query_settings import HTTPQuerySettings
settings = HTTPQuerySettings(debug=True)
# Execute query with debug enabled
result = run_query(dataset, request, timer)
# Check generated SQL and stats
print(f"SQL: {result.extra['sql']}")
print(f"Stats: {result.extra['stats']}")
print(f"Experiments: {result.extra['experiments']}")
Datasets
Dataset and entity configuration
Query Builder
Build queries programmatically