Skip to main content
The Arrow Flight protocol provides a high-performance gRPC interface for executing SQL queries against blockchain datasets. It supports both batch and streaming query modes, returning results in Apache Arrow columnar format.

Overview

Arrow Flight is Apache Arrow’s RPC framework for high-performance data transfer over gRPC. This is the recommended transport for:
  • Production workloads requiring high throughput
  • Streaming queries with real-time data updates
  • Applications that can consume Arrow data directly
  • Use cases requiring reorg detection and watermarking

Endpoint

  • Protocol: gRPC (Arrow Flight)
  • Default Port: 1602
  • Default Address: 0.0.0.0:1602
  • Configuration: Set via flight_addr in config.toml or AMP_CONFIG_FLIGHT_ADDR environment variable

Request Flow

  1. Client calls getFlightInfo with SQL query in FlightDescriptor
  2. Server parses SQL, builds query plan, returns FlightInfo with schema and ticket
  3. Client calls doGet with the ticket to retrieve results
  4. Server executes query and streams FlightData messages
  5. Each message contains Arrow RecordBatch with result rows

Query Modes

Batch Queries

Default mode when no SETTINGS clause is specified. Query runs once and returns complete results.
import pyarrow.flight as flight

client = flight.connect("grpc://localhost:1602")

# Get flight info
info = client.get_flight_info(
    flight.FlightDescriptor.for_command(
        b"SELECT * FROM eth_rpc.blocks LIMIT 10"
    )
)

# Retrieve results
reader = client.do_get(info.endpoints[0].ticket)
table = reader.read_all()
print(table.to_pandas())

Streaming Queries

Continuous execution with incremental results. Enabled by adding SETTINGS stream = true to your SQL query.
import pyarrow.flight as flight

client = flight.connect("grpc://localhost:1602")

info = client.get_flight_info(
    flight.FlightDescriptor.for_command(
        b"SELECT * FROM eth_rpc.blocks SETTINGS stream = true"
    )
)

reader = client.do_get(info.endpoints[0].ticket)
for batch in reader:
    # Process each batch as it arrives
    print(f"Received {batch.data.num_rows} rows")

Headers

Custom headers can be used to control query behavior:
HeaderDescriptionValues
amp-streamOverride streaming modetrue or 1 to enable streaming
amp-resumeResume streaming from cursorJSON-encoded cursor object

Streaming Metadata

For streaming queries, each FlightData message includes app_metadata containing block range information:
{
  "ranges": [
    {
      "network": "eth",
      "numbers": { "start": 100, "end": 102 },
      "hash": "0x..."
    }
  ],
  "ranges_complete": true
}
  • ranges: Array of block ranges covered by this batch
  • ranges_complete: true when this represents a watermark (ranges completed), false for data batches

Client Libraries

Python (pyarrow)

Install the PyArrow Flight SQL client:
pip install pyarrow
Basic usage:
import pyarrow.flight as flight

# Connect to server
client = flight.connect("grpc://localhost:1602")

# Execute query
info = client.get_flight_info(
    flight.FlightDescriptor.for_command(b"SELECT * FROM eth_rpc.blocks LIMIT 10")
)

# Get results
reader = client.do_get(info.endpoints[0].ticket)
table = reader.read_all()

# Convert to pandas DataFrame
df = table.to_pandas()
print(df)

Rust (arrow-flight)

Add to your Cargo.toml:
[dependencies]
arrow-flight = "53"
tonic = "0.12"
Basic usage:
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::FlightDescriptor;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = FlightServiceClient::connect("http://localhost:1602").await?;
    
    let descriptor = FlightDescriptor::new_cmd(
        "SELECT * FROM eth_rpc.blocks LIMIT 10".as_bytes().to_vec()
    );
    
    let flight_info = client.get_flight_info(descriptor).await?;
    let ticket = flight_info.endpoint[0].ticket.clone();
    let mut stream = client.do_get(ticket).await?;
    
    while let Some(batch_result) = stream.next().await {
        let batch = batch_result?;
        println!("Received {} rows", batch.num_rows());
    }
    
    Ok(())
}

Amp Rust Client (amp-client)

The official Amp Rust client provides additional features like reorg detection and state management:
use amp_client::AmpClient;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut client = AmpClient::from_endpoint("http://localhost:1602").await?;
    
    // Batch query
    let mut stream = client.query("SELECT * FROM eth_rpc.blocks LIMIT 10").await?;
    while let Some(batch) = stream.next().await {
        let batch = batch?;
        println!("Received {} rows", batch.num_rows());
    }
    
    Ok(())
}
Streaming with reorg detection:
use amp_client::{AmpClient, ProtocolMessage};
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = AmpClient::from_endpoint("http://localhost:1602").await?;
    
    let mut stream = client.stream("SELECT * FROM eth_rpc.logs").await?;
    
    while let Some(msg) = stream.next().await {
        match msg? {
            ProtocolMessage::Data { batch, ranges } => {
                println!("Received {} rows covering blocks {:?}", batch.num_rows(), ranges);
            }
            ProtocolMessage::Reorg { previous, incoming, invalidation } => {
                println!("Reorg detected! Invalidation ranges: {:?}", invalidation);
            }
            ProtocolMessage::Watermark { ranges } => {
                println!("Watermark: ranges {:?} are complete", ranges);
            }
        }
    }
    
    Ok(())
}

Authentication

Bearer Token (Python)

import pyarrow.flight as flight

client = flight.connect("grpc://localhost:1602")

# Add bearer token to headers
headers = [(b"authorization", b"Bearer YOUR_TOKEN_HERE")]

info = client.get_flight_info(
    flight.FlightDescriptor.for_command(b"SELECT * FROM eth_rpc.blocks LIMIT 10"),
    options=flight.FlightCallOptions(headers=headers)
)

Bearer Token (Rust)

use amp_client::AmpClient;

let mut client = AmpClient::from_endpoint("http://localhost:1602").await?;
client.set_token("YOUR_TOKEN_HERE");

// All subsequent requests will include the token
let mut stream = client.query("SELECT * FROM eth_rpc.blocks LIMIT 10").await?;

Response Format

All responses are returned as Apache Arrow RecordBatches with the following characteristics:
  • Columnar format: Data organized in columns for efficient processing
  • Typed schema: Each column has a specific Arrow data type
  • Zero-copy: Efficient memory usage and serialization
  • Streaming: Results can be processed incrementally

Limitations

  • Arrow format only: Results are always Apache Arrow RecordBatches (use JSONL endpoint for JSON output)
  • gRPC transport: Requires gRPC client library support
  • Schema required: Client must handle Arrow schema for data interpretation

Performance Considerations

Message Size Limits

The default gRPC max decoding message size is 32 MiB. For large result sets, you may need to increase this:
use amp_client::AmpClient;

let client = AmpClient::from_endpoint_with_max_decoding_message_size(
    "http://localhost:1602",
    64 * 1024 * 1024  // 64 MiB
).await?;

Connection Pooling

Reuse the same client instance for multiple queries to avoid connection overhead:
import pyarrow.flight as flight

# Create once, reuse many times
client = flight.connect("grpc://localhost:1602")

# Execute multiple queries
for query in queries:
    info = client.get_flight_info(
        flight.FlightDescriptor.for_command(query.encode())
    )
    reader = client.do_get(info.endpoints[0].ticket)
    # Process results...

Error Handling

Invalid SQL

If the SQL query is malformed, the server returns a gRPC error during getFlightInfo:
try:
    info = client.get_flight_info(
        flight.FlightDescriptor.for_command(b"INVALID SQL")
    )
except flight.FlightServerError as e:
    print(f"Query error: {e}")

Streaming Errors

Errors during query execution are returned in the doGet stream:
try:
    reader = client.do_get(info.endpoints[0].ticket)
    for batch in reader:
        # Process batch
        pass
except flight.FlightServerError as e:
    print(f"Execution error: {e}")

See Also

Build docs developers (and LLMs) love