Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/timeplus-io/proton/llms.txt

Use this file to discover all available pages before exploring further.

INSERT inserts data into streams and tables. Data can be inserted directly with VALUES, from a SELECT query, or in various data formats.

Syntax

INSERT INTO [TABLE | FUNCTION] [db.]stream_name [(column_list)]
[PARTITION BY expr]
[FROM INFILE 'file_path' [COMPRESSION 'compression']]
[FORMAT format_name]
[VALUES (v11, v12, ...), (v21, v22, ...), ...]
[SELECT ...]
[SETTINGS setting=value, ...];

Parameters

stream_name
identifier
required
The name of the stream or table to insert data into.
column_list
identifier_list
Optional list of column names. If omitted, all columns are expected in order.
PARTITION BY
expression
Expression to determine partition for each row (used with table functions).
FROM INFILE
file_path
Path to a file to insert data from.
COMPRESSION
string
Compression format for the input file: gzip, br, xz, zstd, lz4, bz2, deflate, none
FORMAT
format_name
Data format for parsing input. Examples: JSONEachRow, CSV, TabSeparated, Avro, Protobuf
VALUES
value_list
Inline data values to insert.
SELECT
query
SELECT query whose results will be inserted.

Inserting Values

Basic VALUES Insert

INSERT INTO devices (device, temperature)
VALUES ('device1', 25.5), ('device2', 30.2), ('device3', 28.7);

Insert All Columns

INSERT INTO user_events
VALUES
    (1001, 'click', now64(3)),
    (1002, 'view', now64(3)),
    (1001, 'purchase', now64(3));

Insert with Expressions

INSERT INTO metrics (name, value, timestamp)
VALUES
    ('cpu_usage', rand() % 100, now()),
    ('memory_usage', rand() % 100, now()),
    ('disk_usage', rand() % 100, now());

Inserting from SELECT

Insert Query Results

INSERT INTO device_summary
SELECT
    device,
    count(*) AS event_count,
    avg(temperature) AS avg_temp
FROM table(devices)
GROUP BY device;

Insert from Another Stream

INSERT INTO filtered_events
SELECT *
FROM raw_events
WHERE event_type = 'important';

Insert from External Stream

INSERT INTO local_events
SELECT
    raw:user_id::int64 AS user_id,
    raw:event_type AS event_type,
    raw:timestamp AS timestamp
FROM kafka_events
SETTINGS seek_to='earliest';

Inserting from Files

Insert from CSV File

INSERT INTO devices
FROM INFILE '/path/to/data.csv'
FORMAT CSV;

Insert from Compressed File

INSERT INTO events
FROM INFILE '/path/to/data.json.gz'
COMPRESSION 'gzip'
FORMAT JSONEachRow;

Insert Specific Columns from File

INSERT INTO devices (device, temperature)
FROM INFILE '/path/to/data.csv'
FORMAT CSV;

Data Formats

JSONEachRow Format

INSERT INTO events FORMAT JSONEachRow
{"user_id":1001,"event":"click","timestamp":"2024-01-01 10:00:00"}
{"user_id":1002,"event":"view","timestamp":"2024-01-01 10:01:00"}
{"user_id":1003,"event":"purchase","timestamp":"2024-01-01 10:02:00"};

CSV Format

INSERT INTO devices FORMAT CSV
device1,25.5,2024-01-01 10:00:00
device2,30.2,2024-01-01 10:01:00
device3,28.7,2024-01-01 10:02:00;

TabSeparated Format

INSERT INTO devices FORMAT TabSeparated
device1	25.5	2024-01-01 10:00:00
device2	30.2	2024-01-01 10:01:00
device3	28.7	2024-01-01 10:02:00;

RawBLOB Format

INSERT INTO logs (raw) FORMAT RawBLOB
This is raw log data
Another line of log data;

Streaming Ingestion

REST API Ingestion

Insert data via HTTP POST:
curl -X POST http://localhost:8123 \
  -d "INSERT INTO devices FORMAT JSONEachRow" \
  --data-binary @data.json

Continuous Insertion with Materialized View

-- Create external stream
CREATE EXTERNAL STREAM kafka_raw (raw string)
SETTINGS
    type='kafka',
    brokers='localhost:9092',
    topic='raw-events';

-- Create target stream
CREATE STREAM parsed_events (
    user_id int64,
    event_type string,
    timestamp datetime64(3)
);

-- Materialized view continuously inserts parsed data
CREATE MATERIALIZED VIEW mv_parse INTO parsed_events AS
SELECT
    raw:user_id::int64 AS user_id,
    raw:event_type AS event_type,
    to_time(raw:timestamp) AS timestamp
FROM kafka_raw;

Insert into External Streams

Insert to Kafka

CREATE EXTERNAL STREAM kafka_output (
    key string,
    value string
)
SETTINGS
    type='kafka',
    brokers='localhost:9092',
    topic='output-topic';

INSERT INTO kafka_output
SELECT
    device AS key,
    to_json_string(map('device', device, 'temp', temperature)) AS value
FROM devices;

Insert to ClickHouse

CREATE EXTERNAL TABLE ch_table
SETTINGS
    type='clickhouse',
    address='localhost:9000',
    table='events';

INSERT INTO ch_table
SELECT * FROM table(local_events);

Insert with Settings

Set Format-Specific Settings

INSERT INTO events FORMAT JSONEachRow
SETTINGS
    input_format_skip_unknown_fields=1,
    input_format_import_nested_json=1
{"user_id":1001,"event":"click","extra_field":"ignored"};

Set Kafka Producer Settings

INSERT INTO kafka_stream
SELECT * FROM source_stream
SETTINGS one_message_per_row=true;

Insert to Table Functions

Insert with Partition Expression

INSERT INTO FUNCTION s3(
    'https://bucket.s3.amazonaws.com/data.parquet',
    'AccessKey',
    'SecretKey',
    'Parquet'
)
PARTITION BY to_date(timestamp)
SELECT * FROM events;

Column List Variations

Insert Specific Columns

INSERT INTO devices (device, temperature)
VALUES ('device1', 25.5);
-- timestamp and other columns use default values

Insert with Column Reordering

INSERT INTO events (event_type, user_id, timestamp)
VALUES ('click', 1001, now64(3));

Insert with Wildcards and Expressions

INSERT INTO target_stream
SELECT *, 'processed' AS status
FROM source_stream;

Best Practices

  1. Batch inserts for better performance when inserting multiple rows
  2. Use appropriate formats - JSONEachRow for JSON, CSV for comma-separated, RawBLOB for raw text
  3. Specify column lists when not inserting all columns or when column order differs
  4. Use materialized views for continuous streaming ingestion from external sources
  5. Set format settings when dealing with messy or variable input data
  6. Use compression when inserting large files to reduce I/O

Examples

Bulk Load Historical Data

INSERT INTO user_events
FROM INFILE '/data/events-2024.csv.gz'
COMPRESSION 'gzip'
FORMAT CSV
SETTINGS input_format_skip_unknown_fields=1;

Real-time Transformation Pipeline

-- Read from Kafka, transform, write to another Kafka topic
CREATE EXTERNAL STREAM input_kafka (raw string)
SETTINGS type='kafka', brokers='localhost:9092', topic='input';

CREATE EXTERNAL STREAM output_kafka (enriched string)
SETTINGS type='kafka', brokers='localhost:9092', topic='output';

CREATE MATERIALIZED VIEW mv_transform INTO output_kafka AS
SELECT
    to_json_string(
        map(
            'original', raw,
            'processed_at', to_string(now64(3)),
            'uppercase', upper(raw)
        )
    ) AS enriched
FROM input_kafka;

Multi-format Data Ingestion

-- Insert JSON data
INSERT INTO logs FORMAT JSONEachRow
{"level":"info","message":"Application started"};

-- Insert CSV data
INSERT INTO logs (level, message) FORMAT CSV
error,"Connection failed"
warn,"Retry attempt 1";

-- Insert raw text
INSERT INTO logs (message) FORMAT RawBLOB
Debug: Processing request #12345;

Notes

  • INSERT is synchronous by default - it waits for data to be written
  • For streaming ingestion, use materialized views for continuous processing
  • External streams support INSERT for writing to Kafka, Pulsar, ClickHouse, etc.
  • The _tp_time column is automatically set to the current time if not provided
  • Use SETTINGS seek_to='earliest' when inserting from external streams to process historical data

See Also

Build docs developers (and LLMs) love