Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt

Use this file to discover all available pages before exploring further.

Timeplus Proton supports multiple methods for writing data to streams, from direct SQL inserts to streaming ingestion via REST APIs.

INSERT INTO Statement

Basic INSERT Syntax

Insert data using standard SQL syntax:
INSERT INTO stream_name (column1, column2, ...)
VALUES (value1, value2, ...);

Single Row Insert

INSERT INTO events (event_id, user_id, event_type, event_time)
VALUES ('evt_001', 12345, 'click', '2024-01-15 10:30:00');

Multiple Row Insert

Insert multiple rows in a single statement:
INSERT INTO transactions (transaction_id, user_id, amount, created_at)
VALUES 
  ('tx_001', 100, 99.99, '2024-01-15 10:00:00'),
  ('tx_002', 101, 149.50, '2024-01-15 10:05:00'),
  ('tx_003', 102, 75.25, '2024-01-15 10:10:00');

Column Specification

Explicit Columns

Specify columns explicitly for clarity:
INSERT INTO sensor_readings (sensor_id, temperature, humidity)
VALUES ('sensor_001', 72.5, 45.2);

Using Default Values

Omit columns with default values:
-- Stream definition with defaults
CREATE STREAM orders (
  order_id string,
  status string DEFAULT 'pending',
  created_at datetime64(3) DEFAULT now64(3)
);

-- Insert without specifying default columns
INSERT INTO orders (order_id) VALUES ('ord_001');
-- status will be 'pending', created_at will be current time

All Columns Insert

Insert values for all columns in order:
-- Matches column order in stream definition
INSERT INTO events VALUES 
  ('evt_001', 100, 'purchase', '2024-01-15 10:00:00');
Using INSERT without column names requires exact column order and is error-prone. Always specify column names for clarity.

INSERT Formats

JSONEachRow Format

Insert JSON data with one object per line:
INSERT INTO events FORMAT JSONEachRow
{"event_id": "evt_001", "user_id": 100, "event_type": "click"}
{"event_id": "evt_002", "user_id": 101, "event_type": "view"}

CSV Format

Insert CSV data:
INSERT INTO events (event_id, user_id, event_type) FORMAT CSV
evt_001,100,"click"
evt_002,101,"view"
evt_003,102,"purchase"

Values Format (Default)

Standard SQL VALUES syntax:
INSERT INTO events (event_id, user_id, event_type)
FORMAT Values
('evt_001', 100, 'click'),
('evt_002', 101, 'view');

INSERT FROM SELECT

Copy Historical Data

Insert data from another stream:
-- Copy specific records
INSERT INTO archive_events
SELECT * FROM table(events)
WHERE _tp_time < now() - INTERVAL 30 DAY;

-- Transform while copying
INSERT INTO summary_events (event_date, event_count)
SELECT 
  to_date(_tp_time) as event_date,
  count() as event_count
FROM table(events)
GROUP BY event_date;

Populate from External Sources

-- Insert from external table
INSERT INTO local_products
SELECT product_id, name, price
FROM table(external_catalog)
WHERE active = true;

Batch vs Streaming Inserts

Batch Inserts

Batch inserts are one-time operations:
-- Execute once and complete
INSERT INTO events (event_id, data)
VALUES ('evt_001', 'test');
Characteristics:
  • Execute synchronously
  • Return when complete
  • Suitable for bulk loading
  • Can be part of transactions

Streaming Ingestion

For continuous data ingestion, use:
  • REST API streaming inserts
  • External streams (Kafka, etc.)
  • Materialized views from other streams

REST API Ingestion

Streaming Insert Endpoint

Post data to the streaming insert endpoint:
# Single event
curl -X POST http://localhost:8123/ \
  -d "INSERT INTO events FORMAT JSONEachRow
{\"event_id\": \"evt_001\", \"user_id\": 100}"

Continuous Streaming

Stream multiple events continuously:
# Stream events from a file
curl -X POST http://localhost:8123/ \
  -H "Content-Type: application/json" \
  -d @- << EOF
INSERT INTO events FORMAT JSONEachRow
{"event_id": "evt_001", "user_id": 100, "event_type": "click"}
{"event_id": "evt_002", "user_id": 101, "event_type": "view"}
{"event_id": "evt_003", "user_id": 102, "event_type": "purchase"}
EOF

Streaming with Named Pipes

# Create named pipe
mkfifo /tmp/events.pipe

# Stream to Proton
cat /tmp/events.pipe | curl -X POST http://localhost:8123/ \
  -d "INSERT INTO events FORMAT JSONEachRow" \
  --data-binary @-

External Stream Ingestion

Kafka Integration

Create an external stream to ingest from Kafka:
CREATE EXTERNAL STREAM kafka_events (
  event_id string,
  user_id int32,
  event_type string
)
SETTINGS 
  type = 'kafka',
  brokers = 'localhost:9092',
  topic = 'user_events',
  data_format = 'JSONEachRow';

-- Data automatically flows into kafka_events
-- No explicit INSERT needed

Copying from External Streams

Copy data from external stream to internal stream:
-- Via materialized view
CREATE MATERIALIZED VIEW events_mv INTO events AS
SELECT * FROM kafka_events;

-- Or via INSERT SELECT (one-time)
INSERT INTO events
SELECT * FROM kafka_events
SETTINGS seek_to = 'earliest'
LIMIT 10000;

Data Type Considerations

Automatic Type Conversion

-- String to number conversion
INSERT INTO metrics (device_id, value)
VALUES ('device_1', '42.5');  -- '42.5' converted to number

-- Timestamp parsing
INSERT INTO events (event_id, event_time)
VALUES ('evt_001', '2024-01-15 10:30:00');  -- Parsed to datetime

Explicit Type Casting

INSERT INTO events (event_id, user_id, event_time)
VALUES (
  'evt_001',
  to_int32('12345'),
  to_datetime64('2024-01-15 10:30:00', 3)
);

Error Handling

Invalid Data

-- This will fail if constraints are violated
INSERT INTO events (event_id, user_id)
VALUES (NULL, 100);  -- Error if event_id is NOT NULL

Format Errors

-- Mismatched column count
INSERT INTO events (event_id, user_id)
VALUES ('evt_001');  -- Error: missing user_id value

Complete Examples

Bulk Loading Historical Data

-- Load sensor data from CSV file via REST API
-- File: sensor_data.csv
-- sensor_id,temperature,humidity,reading_time
-- sensor_001,72.5,45.2,2024-01-15 10:00:00
-- sensor_002,73.1,44.8,2024-01-15 10:01:00

-- Command:
curl -X POST http://localhost:8123/ \
  -H "Content-Type: text/csv" \
  -d @sensor_data.csv \
  "?query=INSERT INTO sensor_readings FORMAT CSV"

Streaming JSON Events

-- Continuous streaming from application
INSERT INTO user_events FORMAT JSONEachRow
{"user_id": 100, "action": "login", "timestamp": "2024-01-15 10:00:00"}
{"user_id": 101, "action": "view_product", "timestamp": "2024-01-15 10:00:05"}
{"user_id": 100, "action": "add_to_cart", "timestamp": "2024-01-15 10:00:10"}
{"user_id": 102, "action": "purchase", "timestamp": "2024-01-15 10:00:15"}

Transforming Data During Insert

-- Insert with transformations
INSERT INTO normalized_events (
  event_id,
  user_id,
  event_type,
  event_date,
  event_hour
)
SELECT 
  event_id,
  user_id,
  lower(event_type) as event_type,
  to_date(event_time) as event_date,
  to_hour(event_time) as event_hour
FROM table(raw_events)
WHERE event_time >= today() - INTERVAL 7 DAY;

Populating Materialized View Target

-- Target stream for aggregations
CREATE STREAM hourly_metrics (
  metric_date datetime64(3),
  device_id string,
  avg_temperature float64,
  max_temperature float64,
  reading_count int64
);

-- Materialized view populates it automatically
CREATE MATERIALIZED VIEW hourly_metrics_mv INTO hourly_metrics AS
SELECT 
  to_start_of_hour(_tp_time) as metric_date,
  device_id,
  avg(temperature) as avg_temperature,
  max(temperature) as max_temperature,
  count() as reading_count
FROM sensor_readings
GROUP BY metric_date, device_id;

-- No explicit INSERT needed - data flows automatically

Best Practices

Insert multiple rows at once for better performance:
-- Good: Batch insert
INSERT INTO events VALUES 
  ('evt_001', 100),
  ('evt_002', 101),
  ('evt_003', 102);

-- Avoid: Multiple single inserts
INSERT INTO events VALUES ('evt_001', 100);
INSERT INTO events VALUES ('evt_002', 101);
INSERT INTO events VALUES ('evt_003', 102);
  • JSONEachRow: Best for application integration
  • CSV: Efficient for bulk loads
  • Values: Good for small manual inserts
Always specify column names for maintainability:
-- Good
INSERT INTO events (event_id, user_id) VALUES ('evt_001', 100);

-- Risky
INSERT INTO events VALUES ('evt_001', 100);
For continuous data ingestion from message queues:
  • Use external streams (Kafka, etc.) instead of polling + INSERT
  • Let materialized views transform and route data
  • Reduces latency and improves throughput
Ensure data meets schema requirements:
  • Check required fields are present
  • Validate data types
  • Handle NULL values appropriately

Next Steps

Reading Data

Learn how to query streams

Aggregations

Aggregate streaming data

Materialized Views

Automate data transformation

Build docs developers (and LLMs) love