Documentation Index Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
Timeplus Proton supports multiple methods for writing data to streams, from direct SQL inserts to streaming ingestion via REST APIs.
INSERT INTO Statement
Basic INSERT Syntax
Insert data using standard SQL syntax:
INSERT INTO stream_name (column1, column2, ...)
VALUES (value1, value2, ...);
Single Row Insert
INSERT INTO events (event_id, user_id, event_type, event_time)
VALUES ( 'evt_001' , 12345 , 'click' , '2024-01-15 10:30:00' );
Multiple Row Insert
Insert multiple rows in a single statement:
INSERT INTO transactions (transaction_id, user_id, amount, created_at)
VALUES
( 'tx_001' , 100 , 99 . 99 , '2024-01-15 10:00:00' ),
( 'tx_002' , 101 , 149 . 50 , '2024-01-15 10:05:00' ),
( 'tx_003' , 102 , 75 . 25 , '2024-01-15 10:10:00' );
Column Specification
Explicit Columns
Specify columns explicitly for clarity:
INSERT INTO sensor_readings (sensor_id, temperature, humidity)
VALUES ( 'sensor_001' , 72 . 5 , 45 . 2 );
Using Default Values
Omit columns with default values:
-- Stream definition with defaults
CREATE STREAM orders (
order_id string,
status string DEFAULT 'pending' ,
created_at datetime64( 3 ) DEFAULT now64( 3 )
);
-- Insert without specifying default columns
INSERT INTO orders (order_id) VALUES ( 'ord_001' );
-- status will be 'pending', created_at will be current time
All Columns Insert
Insert values for all columns in order:
-- Matches column order in stream definition
INSERT INTO events VALUES
( 'evt_001' , 100 , 'purchase' , '2024-01-15 10:00:00' );
Using INSERT without column names requires exact column order and is error-prone. Always specify column names for clarity.
Insert JSON data with one object per line:
INSERT INTO events FORMAT JSONEachRow
{ "event_id" : "evt_001" , "user_id" : 100 , "event_type" : "click" }
{ "event_id" : "evt_002" , "user_id" : 101 , "event_type" : "view" }
Insert CSV data:
INSERT INTO events (event_id, user_id, event_type) FORMAT CSV
evt_001, 100 , "click"
evt_002, 101 , "view"
evt_003, 102 , "purchase"
Standard SQL VALUES syntax:
INSERT INTO events (event_id, user_id, event_type)
FORMAT Values
( 'evt_001' , 100 , 'click' ),
( 'evt_002' , 101 , 'view' );
INSERT FROM SELECT
Copy Historical Data
Insert data from another stream:
-- Copy specific records
INSERT INTO archive_events
SELECT * FROM table (events)
WHERE _tp_time < now () - INTERVAL 30 DAY ;
-- Transform while copying
INSERT INTO summary_events (event_date, event_count)
SELECT
to_date(_tp_time) as event_date,
count () as event_count
FROM table (events)
GROUP BY event_date;
Populate from External Sources
-- Insert from external table
INSERT INTO local_products
SELECT product_id, name , price
FROM table (external_catalog)
WHERE active = true;
Batch vs Streaming Inserts
Batch Inserts
Batch inserts are one-time operations:
-- Execute once and complete
INSERT INTO events (event_id, data )
VALUES ( 'evt_001' , 'test' );
Characteristics:
Execute synchronously
Return when complete
Suitable for bulk loading
Can be part of transactions
Streaming Ingestion
For continuous data ingestion, use:
REST API streaming inserts
External streams (Kafka, etc.)
Materialized views from other streams
REST API Ingestion
Streaming Insert Endpoint
Post data to the streaming insert endpoint:
# Single event
curl -X POST http://localhost:8123/ \
-d "INSERT INTO events FORMAT JSONEachRow
{ \" event_id \" : \" evt_001 \" , \" user_id \" : 100}"
Continuous Streaming
Stream multiple events continuously:
# Stream events from a file
curl -X POST http://localhost:8123/ \
-H "Content-Type: application/json" \
-d @- << EOF
INSERT INTO events FORMAT JSONEachRow
{"event_id": "evt_001", "user_id": 100, "event_type": "click"}
{"event_id": "evt_002", "user_id": 101, "event_type": "view"}
{"event_id": "evt_003", "user_id": 102, "event_type": "purchase"}
EOF
Streaming with Named Pipes
# Create named pipe
mkfifo /tmp/events.pipe
# Stream to Proton
cat /tmp/events.pipe | curl -X POST http://localhost:8123/ \
-d "INSERT INTO events FORMAT JSONEachRow" \
--data-binary @-
External Stream Ingestion
Kafka Integration
Create an external stream to ingest from Kafka:
CREATE EXTERNAL STREAM kafka_events (
event_id string,
user_id int32,
event_type string
)
SETTINGS
type = 'kafka' ,
brokers = 'localhost:9092' ,
topic = 'user_events' ,
data_format = 'JSONEachRow' ;
-- Data automatically flows into kafka_events
-- No explicit INSERT needed
Copying from External Streams
Copy data from external stream to internal stream:
-- Via materialized view
CREATE MATERIALIZED VIEW events_mv INTO events AS
SELECT * FROM kafka_events;
-- Or via INSERT SELECT (one-time)
INSERT INTO events
SELECT * FROM kafka_events
SETTINGS seek_to = 'earliest'
LIMIT 10000 ;
Data Type Considerations
Automatic Type Conversion
-- String to number conversion
INSERT INTO metrics (device_id, value )
VALUES ( 'device_1' , '42.5' ); -- '42.5' converted to number
-- Timestamp parsing
INSERT INTO events (event_id, event_time)
VALUES ( 'evt_001' , '2024-01-15 10:30:00' ); -- Parsed to datetime
Explicit Type Casting
INSERT INTO events (event_id, user_id, event_time)
VALUES (
'evt_001' ,
to_int32( '12345' ),
to_datetime64( '2024-01-15 10:30:00' , 3 )
);
Error Handling
Invalid Data
-- This will fail if constraints are violated
INSERT INTO events (event_id, user_id)
VALUES ( NULL , 100 ); -- Error if event_id is NOT NULL
-- Mismatched column count
INSERT INTO events (event_id, user_id)
VALUES ( 'evt_001' ); -- Error: missing user_id value
Complete Examples
Bulk Loading Historical Data
-- Load sensor data from CSV file via REST API
-- File: sensor_data.csv
-- sensor_id,temperature,humidity,reading_time
-- sensor_001,72.5,45.2,2024-01-15 10:00:00
-- sensor_002,73.1,44.8,2024-01-15 10:01:00
-- Command:
curl - X POST http : // localhost: 8123 / \
- H "Content-Type: text/csv" \
- d @sensor_data.csv \
"?query=INSERT INTO sensor_readings FORMAT CSV"
Streaming JSON Events
-- Continuous streaming from application
INSERT INTO user_events FORMAT JSONEachRow
{ "user_id" : 100 , "action" : "login" , "timestamp" : "2024-01-15 10:00:00" }
{ "user_id" : 101 , "action" : "view_product" , "timestamp" : "2024-01-15 10:00:05" }
{ "user_id" : 100 , "action" : "add_to_cart" , "timestamp" : "2024-01-15 10:00:10" }
{ "user_id" : 102 , "action" : "purchase" , "timestamp" : "2024-01-15 10:00:15" }
-- Insert with transformations
INSERT INTO normalized_events (
event_id,
user_id,
event_type,
event_date,
event_hour
)
SELECT
event_id,
user_id,
lower (event_type) as event_type,
to_date(event_time) as event_date,
to_hour(event_time) as event_hour
FROM table (raw_events)
WHERE event_time >= today() - INTERVAL 7 DAY ;
Populating Materialized View Target
-- Target stream for aggregations
CREATE STREAM hourly_metrics (
metric_date datetime64( 3 ),
device_id string,
avg_temperature float64,
max_temperature float64,
reading_count int64
);
-- Materialized view populates it automatically
CREATE MATERIALIZED VIEW hourly_metrics_mv INTO hourly_metrics AS
SELECT
to_start_of_hour(_tp_time) as metric_date,
device_id,
avg (temperature) as avg_temperature,
max (temperature) as max_temperature,
count () as reading_count
FROM sensor_readings
GROUP BY metric_date, device_id;
-- No explicit INSERT needed - data flows automatically
Best Practices
Insert multiple rows at once for better performance: -- Good: Batch insert
INSERT INTO events VALUES
( 'evt_001' , 100 ),
( 'evt_002' , 101 ),
( 'evt_003' , 102 );
-- Avoid: Multiple single inserts
INSERT INTO events VALUES ( 'evt_001' , 100 );
INSERT INTO events VALUES ( 'evt_002' , 101 );
INSERT INTO events VALUES ( 'evt_003' , 102 );
Use appropriate data formats
Specify columns explicitly
Always specify column names for maintainability: -- Good
INSERT INTO events (event_id, user_id) VALUES ( 'evt_001' , 100 );
-- Risky
INSERT INTO events VALUES ( 'evt_001' , 100 );
Use external streams for continuous ingestion
For continuous data ingestion from message queues:
Use external streams (Kafka, etc.) instead of polling + INSERT
Let materialized views transform and route data
Reduces latency and improves throughput
Validate data before insertion
Ensure data meets schema requirements:
Check required fields are present
Validate data types
Handle NULL values appropriately
Next Steps
Reading Data Learn how to query streams
Aggregations Aggregate streaming data
Materialized Views Automate data transformation