Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
Streams are the fundamental data structures in Timeplus Proton for storing and processing event data.
Basic CREATE STREAM Syntax
The basic syntax for creating a stream:
CREATE STREAM [IF NOT EXISTS] stream_name (
column1 type1 [DEFAULT expr1],
column2 type2 [DEFAULT expr2],
...
)
[ENGINE = engine_name]
[SETTINGS key1=value1, key2=value2];
Simple Stream Example
CREATE STREAM events (
event_id string,
user_id int32,
event_type string,
event_time datetime64(3)
);
Column Types and Defaults
Supported Data Types
Proton supports standard SQL data types:
CREATE STREAM sensor_data (
device_id string, -- String type
temperature float32, -- 32-bit float
humidity float64, -- 64-bit float
reading_count int64, -- 64-bit integer
is_active bool, -- Boolean
location string, -- String
measured_at datetime64(3), -- Timestamp with millisecond precision
metadata string -- JSON data as string
);
Default Values
Specify default values for columns:
CREATE STREAM orders (
order_id string,
customer_id int32,
amount float64,
status string DEFAULT 'pending',
order_time datetime64(3) DEFAULT now64(3),
random_id int32 DEFAULT rand()
);
The reserved _tp_time column is automatically added to every stream and represents ingestion time.
CREATE RANDOM STREAM
Random streams generate synthetic data for testing and development:
CREATE RANDOM STREAM metrics (
device_id int DEFAULT rand() % 10,
cpu_usage float32 DEFAULT rand() % 100,
memory_usage float32 DEFAULT rand() % 100
)
SETTINGS eps = 5; -- Generate 5 events per second
Random Stream Settings
eps (events per second): Controls data generation rate
- Useful for testing queries without external data sources
- Supports all standard column types
Random Stream Example with Functions
CREATE RANDOM STREAM test_data (
id int DEFAULT rand() % 100,
category string DEFAULT ['A', 'B', 'C', 'D'][rand() % 4 + 1],
value float64 DEFAULT rand() % 1000,
timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 10;
Storage Engines
Default Stream Engine
By default, streams use the Stream engine:
CREATE STREAM events (
id string,
data string
) ENGINE = Stream;
Engine Settings
Configure engine-specific settings:
CREATE STREAM events (
id string,
timestamp datetime64(3)
)
SETTINGS
logstore_retention_bytes = 1073741824, -- 1GB retention
logstore_retention_ms = 86400000; -- 24 hours
Partitioning and Ordering
Partition By
Partition streams for better query performance:
CREATE STREAM user_events (
user_id int32,
event_type string,
event_time datetime64(3)
)
PARTITION BY to_YYYYMM(event_time)
ORDER BY (user_id, event_time);
Primary Key
Define primary key for efficient lookups:
CREATE STREAM metrics (
device_id string,
metric_name string,
value float64,
timestamp datetime64(3)
)
PRIMARY KEY (device_id, metric_name)
ORDER BY (device_id, metric_name, timestamp);
Advanced Stream Types
External Streams
External streams connect to external data sources:
CREATE EXTERNAL STREAM kafka_events (
event_id string,
payload string
)
SETTINGS
type = 'kafka',
brokers = 'localhost:9092',
topic = 'events',
data_format = 'JSONEachRow';
External streams enable integration with Kafka, Redpanda, and other message brokers without data duplication.
Materialized Views as Streams
Create streams that are populated by materialized views:
-- Target stream
CREATE STREAM aggregated_metrics (
device_id string,
avg_value float64,
window_start datetime64(3),
window_end datetime64(3)
);
-- Materialized view that populates it
CREATE MATERIALIZED VIEW metrics_mv INTO aggregated_metrics AS
SELECT
device_id,
avg(value) as avg_value,
window_start,
window_end
FROM tumble(raw_metrics, timestamp, INTERVAL 1 MINUTE)
GROUP BY device_id, window_start, window_end;
IF NOT EXISTS
Prevent errors when creating streams that might already exist:
CREATE STREAM IF NOT EXISTS events (
id string,
data string
);
Complete Examples
IoT Sensor Stream
CREATE STREAM iot_sensors (
sensor_id string,
location string,
temperature float32,
humidity float32,
pressure float32,
battery_level int32 DEFAULT 100,
reading_time datetime64(3) DEFAULT now64(3)
)
PARTITION BY to_YYYYMMDD(reading_time)
ORDER BY (sensor_id, reading_time)
SETTINGS
logstore_retention_ms = 604800000; -- 7 days retention
E-commerce Transactions
CREATE STREAM transactions (
transaction_id string,
user_id int64,
product_id string,
amount float64,
currency string DEFAULT 'USD',
status string DEFAULT 'pending',
created_at datetime64(3) DEFAULT now64(3),
metadata string -- JSON metadata
)
PRIMARY KEY transaction_id
ORDER BY (user_id, created_at);
Random Test Data Stream
CREATE RANDOM STREAM test_events (
user_id int DEFAULT rand() % 1000,
action string DEFAULT ['click', 'view', 'purchase', 'signup'][rand() % 4 + 1],
value float64 DEFAULT rand() % 100,
timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 100; -- 100 events/second for load testing
Viewing Stream Schema
Inspect stream definitions:
-- Show create statement
SHOW CREATE STREAM events;
-- Describe stream columns
DESCRIBE events;
-- List all streams
SHOW STREAMS;
Dropping Streams
Remove streams when no longer needed:
DROP STREAM IF EXISTS events;
Dropping a stream permanently deletes all data. This operation cannot be undone.
Best Practices
Choose appropriate data types
Use the smallest data type that fits your needs:
int32 vs int64 for integers
float32 vs float64 for decimals
datetime64(3) for millisecond precision timestamps
Configure retention based on your use case:SETTINGS
logstore_retention_bytes = 10737418240, -- 10GB
logstore_retention_ms = 2592000000; -- 30 days
Use partitioning for time-series data
Partition by date for better query performance:PARTITION BY to_YYYYMMDD(_tp_time)
Use DEFAULT expressions to auto-populate columns:created_at datetime64(3) DEFAULT now64(3),
id string DEFAULT generate_uuid_v4()
Next Steps
Reading Data
Learn how to query streams
Writing Data
Insert data into streams