Materialized views in Timeplus Proton are continuous queries that transform and route streaming data in real-time. Unlike traditional databases where materialized views are periodic snapshots, Proton’s materialized views process data incrementally as it arrives.
-- External table pointing to ClickHouseCREATE EXTERNAL TABLE ch_analyticsSETTINGS type='clickhouse', address='clickhouse.example.com:9000', user='writer', password='secret', database='analytics', table='hourly_metrics';-- Aggregate and write to ClickHouseCREATE MATERIALIZED VIEW mv_to_ch INTO ch_analytics ASSELECT to_start_of_hour(timestamp) as hour, service, count() as total_requests, count_if(status >= 500) as errors, avg(latency_ms) as avg_latencyFROM access_logsGROUP BY hour, service;
-- External stream from DebeziumCREATE EXTERNAL STREAM customers_cdc(raw string)SETTINGS type='kafka', brokers='redpanda:9092', topic='dbserver1.inventory.customers';-- Changelog stream to maintain current stateCREATE STREAM customers( id int, first_name string, last_name string, email string) PRIMARY KEY id SETTINGS mode='changelog_kv', version_column='_tp_time';-- Process INSERT operations (op='c' for create)CREATE MATERIALIZED VIEW mv_customers_c INTO customers ASSELECT to_time(raw:payload.ts_ms) AS _tp_time, raw:payload.after.id::int AS id, raw:payload.after.first_name AS first_name, raw:payload.after.last_name AS last_name, raw:payload.after.email AS email, 1::int8 as _tp_delta -- +1 for insertFROM customers_cdc WHERE raw:payload.op='c'SETTINGS seek_to='earliest';-- Process UPDATE operations (op='u')CREATE MATERIALIZED VIEW mv_customers_u INTO customers ASWITH cdc_changes AS ( SELECT ts_ms, array_join(changes) AS change, change.1 as val, change.2 AS _tp_delta FROM ( SELECT to_time(raw:payload.ts_ms) AS ts_ms, [(raw:payload.before, -1::int8), (raw:payload.after, 1::int8)] AS changes FROM customers_cdc WHERE raw:payload.op = 'u' SETTINGS seek_to = 'earliest' ))SELECT ts_ms AS _tp_time, val:id::int32 AS id, val:first_name AS first_name, val:last_name AS last_name, val:email AS email, _tp_deltaFROM cdc_changes;-- Process DELETE operations (op='d')CREATE MATERIALIZED VIEW mv_customers_d INTO customers ASSELECT to_time(raw:payload.ts_ms) AS _tp_time, raw:payload.before.id::int AS id, raw:payload.before.first_name AS first_name, raw:payload.before.last_name AS last_name, raw:payload.before.email AS email, -1::int8 as _tp_delta -- -1 for deleteFROM customers_cdc WHERE raw:payload.op='d'SETTINGS seek_to='earliest';
Understanding _tp_delta in CDC
The _tp_delta column controls changelog semantics:
INSERT: Emit row with _tp_delta=1
UPDATE: Emit old row with _tp_delta=-1, then new row with _tp_delta=1
DELETE: Emit row with _tp_delta=-1
Proton’s changelog engine automatically maintains the current state by applying deltas.
-- Route to multiple destinations based on conditions-- High-priority events to alerting systemCREATE MATERIALIZED VIEW mv_alerts INTO alert_stream ASSELECT * FROM events WHERE priority = 'high';-- All events to data lakeCREATE MATERIALIZED VIEW mv_archive INTO s3_sink ASSELECT * FROM events;-- Metrics to monitoringCREATE MATERIALIZED VIEW mv_metrics INTO metrics_stream ASSELECT window_start, count() as event_countFROM tumble(events, 1m)GROUP BY window_start;
-- Deduplicate events based on unique IDCREATE MATERIALIZED VIEW mv_dedup INTO unique_events ASSELECT *FROM ( SELECT *, row_number() OVER (PARTITION BY event_id ORDER BY _tp_time) as rn FROM raw_events)WHERE rn = 1;
-- Group events into sessions with 30-minute timeoutCREATE MATERIALIZED VIEW mv_sessions INTO user_sessions ASSELECT user_id, session_start(user_id, timestamp, INTERVAL 30 MINUTE) as session_id, min(timestamp) as session_start, max(timestamp) as session_end, count() as event_countFROM user_eventsGROUP BY user_id, session_id;
-- Use windows to bound state sizeCREATE MATERIALIZED VIEW mv_bounded INTO metrics ASSELECT window_start, device, avg(temperature) as avg_tempFROM tumble(sensors, 5m)GROUP BY window_start, device;-- State cleared after each window
Unbounded aggregations (global GROUP BY without windows) accumulate state indefinitely. Use windowing or periodic cleanup for production.
-- Moderate: Aggregation with low cardinalityCREATE MATERIALIZED VIEW mv_medium INTO metrics ASSELECT service, -- 10-100 unique values count() as requestsFROM logsGROUP BY service;
-- Heavy: High cardinality aggregationCREATE MATERIALIZED VIEW mv_slow INTO user_stats ASSELECT user_id, -- Millions of unique values count() as events, avg(session_duration) as avg_durationFROM eventsGROUP BY user_id;-- Consider windowing to limit state
-- Atomic replacement (drops and recreates)CREATE OR REPLACE MATERIALIZED VIEW mv_metrics INTO metrics ASSELECT service, count() as requestsFROM logsGROUP BY service;
-- Use try_cast to handle malformed dataCREATE MATERIALIZED VIEW mv_safe INTO clean_data ASSELECT try_cast(raw:user_id as uint64) as user_id, try_cast(raw:amount as decimal(10,2)) as amount, timestampFROM raw_streamWHERE user_id IS NOT NULL; -- Filter out failed casts
-- Route failed records to DLQCREATE STREAM dlq ( raw_data string, error_message string, timestamp datetime64(3));CREATE MATERIALIZED VIEW mv_with_dlq INTO target ASSELECT try_cast(raw:id as uint64) as id, raw:data as data, if(id IS NULL, (INSERT INTO dlq VALUES (raw, 'Invalid ID', now64())), null )FROM sourceWHERE id IS NOT NULL;