Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt

Use this file to discover all available pages before exploring further.

Streams are the fundamental data structures in Timeplus Proton for storing and processing event data.

Basic CREATE STREAM Syntax

The basic syntax for creating a stream:
CREATE STREAM [IF NOT EXISTS] stream_name (
  column1 type1 [DEFAULT expr1],
  column2 type2 [DEFAULT expr2],
  ...
) 
[ENGINE = engine_name]
[SETTINGS key1=value1, key2=value2];

Simple Stream Example

CREATE STREAM events (
  event_id string,
  user_id int32,
  event_type string,
  event_time datetime64(3)
);

Column Types and Defaults

Supported Data Types

Proton supports standard SQL data types:
CREATE STREAM sensor_data (
  device_id string,           -- String type
  temperature float32,        -- 32-bit float
  humidity float64,           -- 64-bit float
  reading_count int64,        -- 64-bit integer
  is_active bool,             -- Boolean
  location string,            -- String
  measured_at datetime64(3),  -- Timestamp with millisecond precision
  metadata string             -- JSON data as string
);

Default Values

Specify default values for columns:
CREATE STREAM orders (
  order_id string,
  customer_id int32,
  amount float64,
  status string DEFAULT 'pending',
  order_time datetime64(3) DEFAULT now64(3),
  random_id int32 DEFAULT rand()
);
The reserved _tp_time column is automatically added to every stream and represents ingestion time.

CREATE RANDOM STREAM

Random streams generate synthetic data for testing and development:
CREATE RANDOM STREAM metrics (
  device_id int DEFAULT rand() % 10,
  cpu_usage float32 DEFAULT rand() % 100,
  memory_usage float32 DEFAULT rand() % 100
) 
SETTINGS eps = 5;  -- Generate 5 events per second

Random Stream Settings

  • eps (events per second): Controls data generation rate
  • Useful for testing queries without external data sources
  • Supports all standard column types

Random Stream Example with Functions

CREATE RANDOM STREAM test_data (
  id int DEFAULT rand() % 100,
  category string DEFAULT ['A', 'B', 'C', 'D'][rand() % 4 + 1],
  value float64 DEFAULT rand() % 1000,
  timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 10;

Storage Engines

Default Stream Engine

By default, streams use the Stream engine:
CREATE STREAM events (
  id string,
  data string
) ENGINE = Stream;

Engine Settings

Configure engine-specific settings:
CREATE STREAM events (
  id string,
  timestamp datetime64(3)
)
SETTINGS 
  logstore_retention_bytes = 1073741824,  -- 1GB retention
  logstore_retention_ms = 86400000;       -- 24 hours

Partitioning and Ordering

Partition By

Partition streams for better query performance:
CREATE STREAM user_events (
  user_id int32,
  event_type string,
  event_time datetime64(3)
)
PARTITION BY to_YYYYMM(event_time)
ORDER BY (user_id, event_time);

Primary Key

Define primary key for efficient lookups:
CREATE STREAM metrics (
  device_id string,
  metric_name string,
  value float64,
  timestamp datetime64(3)
)
PRIMARY KEY (device_id, metric_name)
ORDER BY (device_id, metric_name, timestamp);

Advanced Stream Types

External Streams

External streams connect to external data sources:
CREATE EXTERNAL STREAM kafka_events (
  event_id string,
  payload string
)
SETTINGS 
  type = 'kafka',
  brokers = 'localhost:9092',
  topic = 'events',
  data_format = 'JSONEachRow';
External streams enable integration with Kafka, Redpanda, and other message brokers without data duplication.

Materialized Views as Streams

Create streams that are populated by materialized views:
-- Target stream
CREATE STREAM aggregated_metrics (
  device_id string,
  avg_value float64,
  window_start datetime64(3),
  window_end datetime64(3)
);

-- Materialized view that populates it
CREATE MATERIALIZED VIEW metrics_mv INTO aggregated_metrics AS
SELECT 
  device_id,
  avg(value) as avg_value,
  window_start,
  window_end
FROM tumble(raw_metrics, timestamp, INTERVAL 1 MINUTE)
GROUP BY device_id, window_start, window_end;

IF NOT EXISTS

Prevent errors when creating streams that might already exist:
CREATE STREAM IF NOT EXISTS events (
  id string,
  data string
);

Complete Examples

IoT Sensor Stream

CREATE STREAM iot_sensors (
  sensor_id string,
  location string,
  temperature float32,
  humidity float32,
  pressure float32,
  battery_level int32 DEFAULT 100,
  reading_time datetime64(3) DEFAULT now64(3)
)
PARTITION BY to_YYYYMMDD(reading_time)
ORDER BY (sensor_id, reading_time)
SETTINGS 
  logstore_retention_ms = 604800000;  -- 7 days retention

E-commerce Transactions

CREATE STREAM transactions (
  transaction_id string,
  user_id int64,
  product_id string,
  amount float64,
  currency string DEFAULT 'USD',
  status string DEFAULT 'pending',
  created_at datetime64(3) DEFAULT now64(3),
  metadata string  -- JSON metadata
)
PRIMARY KEY transaction_id
ORDER BY (user_id, created_at);

Random Test Data Stream

CREATE RANDOM STREAM test_events (
  user_id int DEFAULT rand() % 1000,
  action string DEFAULT ['click', 'view', 'purchase', 'signup'][rand() % 4 + 1],
  value float64 DEFAULT rand() % 100,
  timestamp datetime64(3) DEFAULT now64(3)
)
SETTINGS eps = 100;  -- 100 events/second for load testing

Viewing Stream Schema

Inspect stream definitions:
-- Show create statement
SHOW CREATE STREAM events;

-- Describe stream columns
DESCRIBE events;

-- List all streams
SHOW STREAMS;

Dropping Streams

Remove streams when no longer needed:
DROP STREAM IF EXISTS events;
Dropping a stream permanently deletes all data. This operation cannot be undone.

Best Practices

Use the smallest data type that fits your needs:
  • int32 vs int64 for integers
  • float32 vs float64 for decimals
  • datetime64(3) for millisecond precision timestamps
Configure retention based on your use case:
SETTINGS 
  logstore_retention_bytes = 10737418240,  -- 10GB
  logstore_retention_ms = 2592000000;      -- 30 days
Partition by date for better query performance:
PARTITION BY to_YYYYMMDD(_tp_time)
Use DEFAULT expressions to auto-populate columns:
created_at datetime64(3) DEFAULT now64(3),
id string DEFAULT generate_uuid_v4()

Next Steps

Reading Data

Learn how to query streams

Writing Data

Insert data into streams

Build docs developers (and LLMs) love