CREATE EXTERNAL STREAM frontend_events(raw string)SETTINGS type='kafka', brokers='localhost:9092', topic='my-events';-- Query live data from KafkaSELECT * FROM frontend_events;
-- Create a materialized view that processes and writes to KafkaCREATE MATERIALIZED VIEW enriched_events_mvINTO EXTERNAL STREAM output_streamASSELECT raw:userId as user_id, raw:eventType as event_type, raw:timestamp as event_timeFROM frontend_eventsWHERE raw:eventType != 'heartbeat';
For a complete Kafka example with Docker Compose, check out the ecommerce example.
Here’s a more advanced example showing how to build a streaming ETL pipeline:
-- Read from AWS MSK using IAM RoleCREATE EXTERNAL STREAM aws_msk_stream ( device string, temperature float)SETTINGS type='kafka', brokers='prefix.kafka.us-west-2.amazonaws.com:9098', topic='sensor-data', security_protocol='SASL_SSL', sasl_mechanism='AWS_MSK_IAM';-- Write to ClickHouseCREATE EXTERNAL TABLE ch_targetSETTINGS type='clickhouse', address='clickhouse.example.com:9000', user='default', password='***', table='device_metrics';-- Create a long-running materialized view for aggregationCREATE MATERIALIZED VIEW mv_msk_to_ch INTO ch_target ASSELECT window_start as timestamp, device, avg(temperature) as avg_temperature, min(temperature) as min_temperature, max(temperature) as max_temperatureFROM tumble(aws_msk_stream, 10s) GROUP BY window_start, device;
Then create an external stream and start querying:
-- Create external stream to read from KafkaCREATE EXTERNAL STREAM frontend_events(raw string)SETTINGS type='kafka', brokers='redpanda:9092', topic='owlshop-frontend-events';-- Filter events by JSON attributesSELECT _tp_time, raw:ipAddress, raw:requestedUrl FROM frontend_events WHERE raw:method='POST';-- Show a live ASCII bar chartSELECT raw:method, count() as cnt, bar(cnt, 0, 40, 5) as bar FROM frontend_eventsGROUP BY raw:method ORDER BY cnt DESCLIMIT 5 BY emit_version();