Stream-to-stream joins correlate events based on event time:
-- Join orders with payments within same time windowSELECT o.order_id, o.amount, p.payment_methodFROM orders oINNER JOIN payments p ON o.order_id = p.order_idWHERE o._tp_time BETWEEN p._tp_time - INTERVAL 5 MINUTE AND p._tp_time + INTERVAL 5 MINUTE;
SELECT events.event_id, events.country_code, countries.country_name, countries.regionFROM eventsLEFT JOIN country_dict AS countries ON events.country_code = countries.code;
SELECT orders_agg.window_start, orders_agg.order_count, payments_agg.payment_countFROM ( SELECT window_start, window_end, count() as order_count FROM tumble(orders, order_time, INTERVAL 1 MINUTE) GROUP BY window_start, window_end) AS orders_aggINNER JOIN ( SELECT window_start, window_end, count() as payment_count FROM tumble(payments, payment_time, INTERVAL 1 MINUTE) GROUP BY window_start, window_end) AS payments_agg ON orders_agg.window_start = payments_agg.window_start AND orders_agg.window_end = payments_agg.window_end;
-- Find windows with highest activityWITH windowed_data AS ( SELECT window_start, window_end, count(*) as event_count FROM tumble(table(events), _tp_time, INTERVAL 5 MINUTE) GROUP BY window_start, window_end)SELECT w1.window_start, w1.event_count, w2.max_countFROM windowed_data w1INNER JOIN ( SELECT max(event_count) as max_count FROM windowed_data) w2 ON w1.event_count = w2.max_count;
SELECT live.device_id, live.temperature, hist.avg_tempFROM sensor_readings AS liveLEFT JOIN ( SELECT device_id, avg(temperature) as avg_temp FROM table(sensor_readings) WHERE _tp_time > now() - INTERVAL 24 HOUR GROUP BY device_id) AS hist ON live.device_id = hist.device_idWHERE live.temperature > hist.avg_temp * 1.5;
-- Correlate orders with payments in real-timeSELECT o.order_id, o.customer_id, o.amount as order_amount, p.payment_method, p.amount as payment_amount, p.payment_timeFROM orders oINNER JOIN payments p ON o.order_id = p.order_idWHERE o.amount = p.amount; -- Validate amounts match
-- Correlate bids with auctionsSELECT b.bid_id, b.auction_id, b.bidder_id, b.bid_amount, a.auction_name, a.starting_price, CASE WHEN b.bid_amount > a.starting_price * 2 THEN 'high' WHEN b.bid_amount > a.starting_price THEN 'normal' ELSE 'low' END as bid_categoryFROM bids bINNER JOIN auctions a ON b.auction_id = a.auction_id;
-- Compare auction bids to maximum bids per windowWITH auction_bids AS ( SELECT auction_id, count(*) as num_bids, window_start, window_end FROM hop(bids, bid_time, INTERVAL 2 SECOND, INTERVAL 10 SECOND) GROUP BY auction_id, window_start, window_end),max_bids AS ( SELECT max(num_bids) as max_num, window_start, window_end FROM auction_bids GROUP BY window_start, window_end)SELECT ab.auction_id, ab.num_bidsFROM auction_bids abINNER JOIN max_bids mb ON ab.window_start = mb.window_start AND ab.window_end = mb.window_endWHERE ab.num_bids >= mb.max_num;