External streams allow Timeplus Proton to read from and write to external systems like Kafka, Pulsar, Redpanda, and HTTP endpoints without data duplication. They act as virtual streams that proxy to external sources.
-- Scan incoming events (streaming)SELECT * FROM kafka_events;-- Filter and transformSELECT user_id, event_type, to_string(timestamp) as event_timeFROM user_eventsWHERE event_type = 'purchase';-- Live aggregationSELECT event_type, count() as total, count(DISTINCT user_id) as unique_usersFROM user_eventsGROUP BY event_type;
Seek Position
Consumer Group
-- Start from earliest messagesSELECT * FROM kafka_events SETTINGS seek_to='earliest';-- Start from latest (default)SELECT * FROM kafka_events SETTINGS seek_to='latest';-- Start from specific timestampSELECT * FROM kafka_events SETTINGS seek_to='1640000000000'; -- Unix timestamp in ms
-- Read raw events from KafkaCREATE EXTERNAL STREAM frontend_events(raw string)SETTINGS type='kafka', brokers='redpanda:9092', topic='owlshop-frontend-events';-- Write processed events to another topicCREATE EXTERNAL STREAM target( _tp_time datetime64(3), url string, method string, ip string) SETTINGS type='kafka', brokers='redpanda:9092', topic='masked-fe-event', data_format='JSONEachRow';-- ETL pipeline with data maskingCREATE MATERIALIZED VIEW mv INTO target ASSELECT now64() AS _tp_time, raw:requestedUrl AS url, raw:method AS method, lower(hex(md5(raw:ipAddress))) AS ip -- Hash IP addressFROM frontend_events;
-- External stream for Debezium CDCCREATE EXTERNAL STREAM customers_cdc(raw string)SETTINGS type='kafka', brokers='redpanda:9092', topic='dbserver1.inventory.customers';-- Changelog stream to maintain current stateCREATE STREAM customers( id int, first_name string, last_name string, email string) PRIMARY KEY id SETTINGS mode='changelog_kv', version_column='_tp_time';-- Process INSERT eventsCREATE MATERIALIZED VIEW mv_customers_c INTO customers ASSELECT to_time(raw:payload.ts_ms) AS _tp_time, raw:payload.after.id::int AS id, raw:payload.after.first_name AS first_name, raw:payload.after.last_name AS last_name, raw:payload.after.email AS email, 1::int8 as _tp_deltaFROM customers_cdc WHERE raw:payload.op='c'SETTINGS seek_to='earliest';-- Process UPDATE events (emit old and new)CREATE MATERIALIZED VIEW mv_customers_u INTO customers ASWITH cdc_changes AS ( SELECT to_time(raw:payload.ts_ms) AS ts_ms, [(raw:payload.before, -1::int8), (raw:payload.after, 1::int8)] AS changes FROM customers_cdc WHERE raw:payload.op = 'u' SETTINGS seek_to = 'earliest')SELECT ts_ms AS _tp_time, array_join(changes).1:id::int AS id, array_join(changes).1:first_name AS first_name, array_join(changes).1:last_name AS last_name, array_join(changes).1:email AS email, array_join(changes).2 AS _tp_deltaFROM cdc_changes;-- Process DELETE eventsCREATE MATERIALIZED VIEW mv_customers_d INTO customers ASSELECT to_time(raw:payload.ts_ms) AS _tp_time, raw:payload.before.id::int AS id, raw:payload.before.first_name AS first_name, raw:payload.before.last_name AS last_name, raw:payload.before.email AS email, -1::int8 as _tp_deltaFROM customers_cdc WHERE raw:payload.op='d'SETTINGS seek_to='earliest';
-- Use raw string and parse manuallyCREATE EXTERNAL STREAM kafka_raw(raw string)SETTINGS type='kafka', brokers='localhost:9092', topic='events';-- Parse with error handlingSELECT raw, try_cast(raw:id as uint64) as id, raw:name::string as nameFROM kafka_raw;