Skip to main content
Optimizing Delta Sharing performance requires understanding data transfer patterns, file formats, and query execution strategies. This guide covers techniques to maximize throughput and minimize latency when sharing large datasets.

Large Dataset Best Practices

When sharing large datasets (hundreds of GBs to TBs), implement these optimization strategies:

Partition Your Data

Partitioning significantly improves query performance by enabling partition pruning:
# Example: Time-series data partitioned by date
import delta_sharing

profile_file = "profile.share"
table_url = f"{profile_file}#share.schema.events"

# Query with partition filter for better performance
df = delta_sharing.load_as_pandas(
    table_url,
    predicateHints=["date >= '2024-01-01'", "date <= '2024-01-31'"]
)
Partitioning Benefits:
  • Reduced data scanning - Only relevant partitions are read
  • Faster query execution - Fewer files to process
  • Lower network transfer - Less data moved over the network
  • Cost savings - Reduced compute and data transfer costs
Choose partition columns based on your query patterns. Common choices include:
  • Time dimensions: date, year, month, hour
  • Geographic dimensions: country, region, state
  • Category dimensions: product_type, customer_segment

File Size Optimization

Optimal file sizes balance parallelism and overhead: Recommended File Sizes:
  • Minimum: 128 MB - Avoids excessive file listing overhead
  • Optimal: 256 MB to 1 GB - Good balance for most workloads
  • Maximum: 2 GB - Prevents memory issues in readers
# When creating Delta tables, control file sizes
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Configure file size
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000000)
spark.conf.set("spark.databricks.delta.targetFileSize", "256mb")

df.write.format("delta") \
    .partitionBy("date") \
    .save("/path/to/table")

Limit Hint Usage

Use limitHint to reduce data transfer when you only need a subset:
# Fetch sample data efficiently
sample_df = delta_sharing.load_as_pandas(
    table_url,
    limit=1000  # Server may return fewer files
)

# For Spark connector
df = spark.read.format("deltaSharing") \
    .option("limitHint", "10000") \
    .load(table_url)
Best Effort OptimizationThe limitHint is a best effort parameter. The server may return more rows than requested. Always apply limits in your query if strict row counts are required.

Delta Format vs Parquet Format

Delta Sharing supports two response formats with different performance characteristics:

Response Format Comparison

When to Use:
  • Legacy clients (delta-sharing-spark < 3.1)
  • Simple tables without advanced Delta features
  • Maximizing client compatibility
Characteristics:
  • Compatible with all Delta Sharing clients
  • Server converts Delta metadata to simplified format
  • Limited to basic Delta features (minReaderVersion = 1)
  • No support for Deletion Vectors or Column Mapping
Request:
# Default - no header needed
df = delta_sharing.load_as_pandas(table_url)
Response Structure:
{
  "protocol": {"minReaderVersion": 1},
  "metaData": {...},
  "file": {"url": "https://...", "id": "...", "size": 573}
}
When to Use:
  • Modern clients (delta-sharing-spark >= 3.1)
  • Tables with advanced Delta features
  • Need for Deletion Vectors or Column Mapping
  • Performance optimization through native Delta reading
Characteristics:
  • Supports advanced Delta features (minReaderVersion > 1)
  • Native Delta log structure preserved
  • Better performance with Delta Kernel
  • Deletion Vectors for efficient updates
  • Column Mapping for schema evolution
Request:
# Explicitly request Delta format
df = delta_sharing.load_as_pandas(
    table_url,
    use_delta_format=True
)
Request Header:
delta-sharing-capabilities: responseformat=delta;readerfeatures=deletionvectors,columnmapping
Response Structure:
{
  "protocol": {
    "deltaProtocol": {"minReaderVersion": 3, "minWriterVersion": 7}
  },
  "metaData": {
    "deltaMetadata": {...},
    "location": "s3://bucket/table"
  },
  "file": {
    "deltaSingleAction": {"add": {...}}
  }
}

Format Selection Strategy

# Let client choose optimal format
df = delta_sharing.load_as_pandas(
    table_url
    # Client auto-detects capabilities
)

# Explicitly prefer Delta format with Parquet fallback
# Server returns Delta if table has advanced features,
# Parquet otherwise
Performance ImpactDelta format can provide 2-5x better query performance for tables with:
  • Deletion Vectors (updates/deletes)
  • Column Mapping (schema evolution)
  • Large partition counts

Batch Conversion for Memory Optimization

For large tables that don’t fit in memory, use batch conversion:

Memory-Efficient Data Loading

import delta_sharing

profile_file = "profile.share"
table_url = f"{profile_file}#share.schema.large_table"

# Load in batches to reduce memory usage
df = delta_sharing.load_as_pandas(
    table_url,
    convert_in_batches=True
)
How Batch Conversion Works:
  1. Fetches file metadata from server
  2. Downloads and converts Parquet files in smaller batches
  3. Concatenates results incrementally
  4. Reduces peak memory consumption
Memory Comparison:
MethodPeak MemoryBest For
Standard~2x table sizeTables < 10 GB
Batch Conversion~1.2x table sizeTables > 10 GB
Streaming (Spark)~100 MB per partitionTables > 100 GB

Spark Streaming for Continuous Data

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()
val tablePath = "profile.share#share.schema.events"

// Stream new data efficiently
val df = spark.readStream
  .format("deltaSharing")
  .option("startingVersion", "1")
  .option("skipChangeCommits", "true")
  .load(tablePath)

// Process with micro-batches
df.writeStream
  .format("console")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()
Streaming Performance Configuration:
# Configure streaming intervals
spark.conf.set(
    "spark.delta.sharing.streaming.queryTableVersionIntervalSeconds", 
    "60"  # Check for new data every 60 seconds (min: 10s)
)

# Limit versions per RPC to control batch size
df = spark.readStream \
    .format("deltaSharing") \
    .option("maxVersionsPerRpc", "10") \
    .load(tablePath)
Streaming Best Practices:
  • Set maxVersionsPerRpc to 10-20 for stable load
  • Use checkpoints for fault tolerance
  • Monitor lag between source and consumer
  • Increase queryTableVersionIntervalSeconds to reduce server load

Predicate Pushdown Optimization

Predicate pushdown filters data at the source, reducing network transfer:

SQL Predicate Hints

# Filter data server-side (best effort)
df = delta_sharing.load_as_pandas(
    table_url,
    predicateHints=[
        "date >= '2024-01-01'",
        "country = 'US'",
        "revenue > 1000"
    ]
)
Supported SQL Expressions:
OperatorExampleUse Case
=col = 123Exact match
>, <col > 1000Range queries
>=, <=col >= '2024-01-01'Inclusive ranges
<>col <> 'US'Exclusion
IS NULLcol IS NULLNull checks
IS NOT NULLcol IS NOT NULLNon-null checks
JSON predicates provide structured, type-safe filtering:
import json

# Complex predicate with type safety
json_predicate = json.dumps({
    "op": "and",
    "children": [
        {
            "op": "equal",
            "children": [
                {"op": "column", "name": "hireDate", "valueType": "date"},
                {"op": "literal", "value": "2021-04-29", "valueType": "date"}
            ]
        },
        {
            "op": "greaterThan",
            "children": [
                {"op": "column", "name": "salary", "valueType": "long"},
                {"op": "literal", "value": "50000", "valueType": "long"}
            ]
        }
    ]
})

df = delta_sharing.load_as_pandas(
    table_url,
    jsonPredicateHints=json_predicate
)
JSON Predicate Benefits:
  • Type-safe filtering with explicit valueType
  • Complex boolean logic (and, or, not)
  • Easier for servers to parse and optimize
  • Preferred over SQL predicateHints
Best Effort FilteringBoth predicateHints and jsonPredicateHints are best effort. The server may:
  • Return files that don’t satisfy predicates
  • Skip predicates if parsing fails
  • Return all files if validation fails
Always apply predicates client-side for correctness.

Version and Time Travel Performance

Querying historical versions efficiently:

Version-Based Queries

# Query specific table version
df_v100 = delta_sharing.load_as_pandas(
    table_url,
    version=100  # Fast - uses cached metadata
)

# Query by timestamp
df_jan1 = delta_sharing.load_as_pandas(
    table_url,
    timestamp="2024-01-01T00:00:00Z"
)
Performance Characteristics:
  • Version queries: Fast - direct metadata lookup
  • Timestamp queries: Slower - requires timestamp-to-version mapping
  • Latest version: Fastest - no additional lookups

Change Data Feed (CDF) Performance

# Efficiently query incremental changes
changes_df = delta_sharing.load_table_changes_as_pandas(
    table_url,
    starting_version=100,
    ending_version=150  # Hint to limit data transfer
)

# Use Delta format for CDF
changes_df = delta_sharing.load_table_changes_as_pandas(
    table_url,
    starting_version=100,
    use_delta_format=True,
    convert_in_batches=True
)
CDF Optimization Tips:
  • Use ending_version to limit query window
  • Enable convert_in_batches for large change sets
  • Query smaller version ranges for faster results
  • Use Delta format for tables with frequent updates

Caching Strategies

Implement caching to reduce repeated data transfers:

Client-Side Caching

import delta_sharing
import hashlib

class CachedSharingClient:
    def __init__(self, profile_file):
        self.client = delta_sharing.SharingClient(profile_file)
        self.cache = {}
    
    def load_table_cached(self, table_url, **kwargs):
        # Create cache key from table URL and parameters
        cache_key = hashlib.md5(
            f"{table_url}{kwargs}".encode()
        ).hexdigest()
        
        # Check version to invalidate cache
        current_version = self.get_table_version(table_url)
        
        if cache_key in self.cache:
            cached_version, data = self.cache[cache_key]
            if cached_version == current_version:
                return data
        
        # Load and cache data
        data = delta_sharing.load_as_pandas(table_url, **kwargs)
        self.cache[cache_key] = (current_version, data)
        return data
    
    def get_table_version(self, table_url):
        # Extract share, schema, table from URL
        # Make HEAD request to get version
        # Implementation details...
        pass

File-Level Caching

Delta Sharing uses file id for consistent file identification:
{
  "file": {
    "id": "591723a8-6a27-4240-a90e-57426f4736d2",
    "url": "https://...",
    "size": 573
  }
}
Caching Strategy:
  • Cache downloaded Parquet files by id
  • Check expirationTimestamp before reusing URLs
  • Implement LRU eviction for cache size management
  • Use local SSD for cache storage

Network and Connection Optimization

Connection Pooling

import delta_sharing
import urllib3

# Configure connection pooling
http = urllib3.PoolManager(
    maxsize=10,  # Maximum connections
    block=True,  # Block when pool is full
    retries=urllib3.Retry(
        total=3,
        backoff_factor=0.5
    )
)

# Use persistent client
client = delta_sharing.SharingClient(profile_file)

Parallel Downloads

from concurrent.futures import ThreadPoolExecutor
import pandas as pd

def download_file(file_info):
    """Download single Parquet file."""
    url = file_info['url']
    return pd.read_parquet(url)

# Get file list from metadata
files = delta_sharing.list_files(table_url)

# Download in parallel
with ThreadPoolExecutor(max_workers=10) as executor:
    dataframes = list(executor.map(download_file, files))

# Combine results
final_df = pd.concat(dataframes, ignore_index=True)
Optimal Worker CountSet parallel workers based on:
  • Available bandwidth: ~10-20 workers for 1 Gbps
  • File count and size: More workers for many small files
  • Client resources: Monitor CPU and memory usage

Monitoring and Profiling

Track performance metrics to identify bottlenecks:

Query Timing

import time
import delta_sharing

def profile_query(table_url, **kwargs):
    """Profile Delta Sharing query performance."""
    
    # Time metadata fetch
    start = time.time()
    # Metadata call
    metadata_time = time.time() - start
    
    # Time data download
    start = time.time()
    df = delta_sharing.load_as_pandas(table_url, **kwargs)
    download_time = time.time() - start
    
    # Calculate metrics
    rows = len(df)
    size_mb = df.memory_usage(deep=True).sum() / 1024 / 1024
    throughput = size_mb / download_time if download_time > 0 else 0
    
    return {
        'metadata_time': metadata_time,
        'download_time': download_time,
        'total_time': metadata_time + download_time,
        'rows': rows,
        'size_mb': size_mb,
        'throughput_mbps': throughput
    }

# Example usage
metrics = profile_query(
    table_url,
    predicateHints=["date >= '2024-01-01'"]
)
print(f"Throughput: {metrics['throughput_mbps']:.2f} MB/s")

Server-Side Metrics

Monitor Delta Sharing server performance: Key Metrics:
  • Request latency (p50, p95, p99)
  • Requests per second
  • Error rates (401, 403, 500)
  • Data transfer volume
  • Active connections
Log Analysis:
# Analyze server logs for slow queries
grep "query" delta-sharing.log | \
  awk '{print $5}' | \
  sort -n | \
  tail -10

Performance Tuning Checklist

Data Organization:
  • Partition tables by common query dimensions
  • Maintain optimal file sizes (256 MB - 1 GB)
  • Use Z-ordering for multi-dimensional queries (Delta only)
  • Regularly compact small files
Query Optimization:
  • Use predicateHints or jsonPredicateHints for filtering
  • Set appropriate limitHint values
  • Enable Delta format for advanced features
  • Use batch conversion for large tables
Network Efficiency:
  • Implement connection pooling
  • Use parallel downloads for multiple files
  • Cache frequently accessed data
  • Monitor and optimize bandwidth usage
Streaming Performance:
  • Configure appropriate check intervals
  • Limit versions per RPC (10-20)
  • Use checkpoints for fault tolerance
  • Monitor consumer lag
Monitoring:
  • Track query latencies
  • Monitor data transfer volumes
  • Set up alerts for performance degradation
  • Profile slow queries regularly

Additional Resources

Build docs developers (and LLMs) love