Documentation Index
Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt
Use this file to discover all available pages before exploring further.
INSERT inserts data into streams and tables. Data can be inserted directly with VALUES, from a SELECT query, or in various data formats.
Syntax
INSERT INTO [TABLE | FUNCTION] [db.]stream_name [(column_list)]
[PARTITION BY expr]
[FROM INFILE 'file_path' [COMPRESSION 'compression']]
[FORMAT format_name]
[VALUES (v11, v12, ...), (v21, v22, ...), ...]
[SELECT ...]
[SETTINGS setting=value, ...];
Parameters
The name of the stream or table to insert data into.
Optional list of column names. If omitted, all columns are expected in order.
Expression to determine partition for each row (used with table functions).
Path to a file to insert data from.
Compression format for the input file: gzip, br, xz, zstd, lz4, bz2, deflate, none
Data format for parsing input. Examples: JSONEachRow, CSV, TabSeparated, Avro, Protobuf
Inline data values to insert.
SELECT query whose results will be inserted.
Inserting Values
Basic VALUES Insert
INSERT INTO devices (device, temperature)
VALUES ('device1', 25.5), ('device2', 30.2), ('device3', 28.7);
Insert All Columns
INSERT INTO user_events
VALUES
(1001, 'click', now64(3)),
(1002, 'view', now64(3)),
(1001, 'purchase', now64(3));
Insert with Expressions
INSERT INTO metrics (name, value, timestamp)
VALUES
('cpu_usage', rand() % 100, now()),
('memory_usage', rand() % 100, now()),
('disk_usage', rand() % 100, now());
Inserting from SELECT
Insert Query Results
INSERT INTO device_summary
SELECT
device,
count(*) AS event_count,
avg(temperature) AS avg_temp
FROM table(devices)
GROUP BY device;
Insert from Another Stream
INSERT INTO filtered_events
SELECT *
FROM raw_events
WHERE event_type = 'important';
Insert from External Stream
INSERT INTO local_events
SELECT
raw:user_id::int64 AS user_id,
raw:event_type AS event_type,
raw:timestamp AS timestamp
FROM kafka_events
SETTINGS seek_to='earliest';
Inserting from Files
Insert from CSV File
INSERT INTO devices
FROM INFILE '/path/to/data.csv'
FORMAT CSV;
Insert from Compressed File
INSERT INTO events
FROM INFILE '/path/to/data.json.gz'
COMPRESSION 'gzip'
FORMAT JSONEachRow;
Insert Specific Columns from File
INSERT INTO devices (device, temperature)
FROM INFILE '/path/to/data.csv'
FORMAT CSV;
INSERT INTO events FORMAT JSONEachRow
{"user_id":1001,"event":"click","timestamp":"2024-01-01 10:00:00"}
{"user_id":1002,"event":"view","timestamp":"2024-01-01 10:01:00"}
{"user_id":1003,"event":"purchase","timestamp":"2024-01-01 10:02:00"};
INSERT INTO devices FORMAT CSV
device1,25.5,2024-01-01 10:00:00
device2,30.2,2024-01-01 10:01:00
device3,28.7,2024-01-01 10:02:00;
INSERT INTO devices FORMAT TabSeparated
device1 25.5 2024-01-01 10:00:00
device2 30.2 2024-01-01 10:01:00
device3 28.7 2024-01-01 10:02:00;
INSERT INTO logs (raw) FORMAT RawBLOB
This is raw log data
Another line of log data;
Streaming Ingestion
REST API Ingestion
Insert data via HTTP POST:
curl -X POST http://localhost:8123 \
-d "INSERT INTO devices FORMAT JSONEachRow" \
--data-binary @data.json
Continuous Insertion with Materialized View
-- Create external stream
CREATE EXTERNAL STREAM kafka_raw (raw string)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='raw-events';
-- Create target stream
CREATE STREAM parsed_events (
user_id int64,
event_type string,
timestamp datetime64(3)
);
-- Materialized view continuously inserts parsed data
CREATE MATERIALIZED VIEW mv_parse INTO parsed_events AS
SELECT
raw:user_id::int64 AS user_id,
raw:event_type AS event_type,
to_time(raw:timestamp) AS timestamp
FROM kafka_raw;
Insert into External Streams
Insert to Kafka
CREATE EXTERNAL STREAM kafka_output (
key string,
value string
)
SETTINGS
type='kafka',
brokers='localhost:9092',
topic='output-topic';
INSERT INTO kafka_output
SELECT
device AS key,
to_json_string(map('device', device, 'temp', temperature)) AS value
FROM devices;
Insert to ClickHouse
CREATE EXTERNAL TABLE ch_table
SETTINGS
type='clickhouse',
address='localhost:9000',
table='events';
INSERT INTO ch_table
SELECT * FROM table(local_events);
Insert with Settings
INSERT INTO events FORMAT JSONEachRow
SETTINGS
input_format_skip_unknown_fields=1,
input_format_import_nested_json=1
{"user_id":1001,"event":"click","extra_field":"ignored"};
Set Kafka Producer Settings
INSERT INTO kafka_stream
SELECT * FROM source_stream
SETTINGS one_message_per_row=true;
Insert to Table Functions
Insert with Partition Expression
INSERT INTO FUNCTION s3(
'https://bucket.s3.amazonaws.com/data.parquet',
'AccessKey',
'SecretKey',
'Parquet'
)
PARTITION BY to_date(timestamp)
SELECT * FROM events;
Column List Variations
Insert Specific Columns
INSERT INTO devices (device, temperature)
VALUES ('device1', 25.5);
-- timestamp and other columns use default values
Insert with Column Reordering
INSERT INTO events (event_type, user_id, timestamp)
VALUES ('click', 1001, now64(3));
Insert with Wildcards and Expressions
INSERT INTO target_stream
SELECT *, 'processed' AS status
FROM source_stream;
Best Practices
-
Batch inserts for better performance when inserting multiple rows
-
Use appropriate formats - JSONEachRow for JSON, CSV for comma-separated, RawBLOB for raw text
-
Specify column lists when not inserting all columns or when column order differs
-
Use materialized views for continuous streaming ingestion from external sources
-
Set format settings when dealing with messy or variable input data
-
Use compression when inserting large files to reduce I/O
Examples
Bulk Load Historical Data
INSERT INTO user_events
FROM INFILE '/data/events-2024.csv.gz'
COMPRESSION 'gzip'
FORMAT CSV
SETTINGS input_format_skip_unknown_fields=1;
-- Read from Kafka, transform, write to another Kafka topic
CREATE EXTERNAL STREAM input_kafka (raw string)
SETTINGS type='kafka', brokers='localhost:9092', topic='input';
CREATE EXTERNAL STREAM output_kafka (enriched string)
SETTINGS type='kafka', brokers='localhost:9092', topic='output';
CREATE MATERIALIZED VIEW mv_transform INTO output_kafka AS
SELECT
to_json_string(
map(
'original', raw,
'processed_at', to_string(now64(3)),
'uppercase', upper(raw)
)
) AS enriched
FROM input_kafka;
-- Insert JSON data
INSERT INTO logs FORMAT JSONEachRow
{"level":"info","message":"Application started"};
-- Insert CSV data
INSERT INTO logs (level, message) FORMAT CSV
error,"Connection failed"
warn,"Retry attempt 1";
-- Insert raw text
INSERT INTO logs (message) FORMAT RawBLOB
Debug: Processing request #12345;
Notes
- INSERT is synchronous by default - it waits for data to be written
- For streaming ingestion, use materialized views for continuous processing
- External streams support INSERT for writing to Kafka, Pulsar, ClickHouse, etc.
- The
_tp_time column is automatically set to the current time if not provided
- Use
SETTINGS seek_to='earliest' when inserting from external streams to process historical data
See Also