Skip to main content
Spark SQL supports operating on a variety of data sources through the DataFrame interface. You can use relational transformations and SQL queries on data from any supported source.

Generic Load/Save Functions

The default data source is Parquet (unless configured otherwise via spark.sql.sources.default):
# Load and save with default format (Parquet)
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

Manually Specifying Options

You can specify the data source format and pass additional options:

JSON Files

# Read JSON with options
df = spark.read.load(
    "examples/src/main/resources/people.json",
    format="json"
)

# Save as Parquet
df.select("name", "age").write.save(
    "namesAndAges.parquet",
    format="parquet"
)
Built-in data sources have short names: json, parquet, jdbc, orc, csv, text. You can also use fully qualified names like org.apache.spark.sql.parquet.

CSV Files

# Read CSV with options
df = spark.read.load(
    "examples/src/main/resources/people.csv",
    format="csv",
    sep=";",
    inferSchema="true",
    header="true"
)

Run SQL on Files Directly

You can query files directly with SQL without loading them into a DataFrame:
SELECT * FROM parquet.`examples/src/main/resources/users.parquet`
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Save Modes

Save operations can specify how to handle existing data:
error
default
When saving a DataFrame, if data already exists, an exception is thrown.
append
When saving a DataFrame, if data already exists, contents are appended to existing data.
overwrite
When saving a DataFrame, if data already exists, existing data is overwritten.
ignore
When saving a DataFrame, if data already exists, the save operation does not save contents and does not change existing data.
df.write.mode("overwrite").save("output.parquet")
df.write.mode("append").save("output.parquet")
Save modes do not utilize locking and are not atomic. When performing an overwrite, data is deleted before writing new data.

Parquet Files

Parquet is a columnar format supported by many data processing systems. Spark SQL provides support for reading and writing Parquet files that automatically preserves the schema.

Loading Parquet Files

# Read Parquet file
df = spark.read.parquet("examples/src/main/resources/users.parquet")

# Write Parquet file
df.write.parquet("users_output.parquet")

Partition Discovery

Table partitioning is a common optimization. Spark SQL automatically discovers and infers partitioning information:
path/to/table/
├── gender=male/
│   ├── country=US/
│   │   └── data.parquet
│   └── country=CN/
│       └── data.parquet
└── gender=female/
    ├── country=US/
    │   └── data.parquet
    └── country=CN/
        └── data.parquet
When you read path/to/table, Spark SQL automatically extracts gender and country as partitioning columns:
df = spark.read.parquet("path/to/table")
df.printSchema()
# root
# |-- name: string
# |-- age: long
# |-- gender: string
# |-- country: string
Partition column data types are automatically inferred. You can disable automatic inference by setting spark.sql.sources.partitionColumnTypeInference.enabled to false.

Schema Merging

Parquet supports schema evolution. Spark SQL can automatically detect and merge schemas from multiple Parquet files:
# Enable schema merging
df = spark.read.option("mergeSchema", "true").parquet("path/to/table")
Or set the global option:
spark.conf.set("spark.sql.parquet.mergeSchema", "true")

JDBC Databases

Spark SQL can load data from JDBC databases:
# Read from JDBC
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

# Write to JDBC
df.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()
Make sure the JDBC driver JAR is included in your Spark classpath.

Bucketing, Sorting and Partitioning

For file-based data sources, you can optimize output with bucketing, sorting, and partitioning:

Partitioning

# Partition by a column
df.write.partitionBy("favorite_color").parquet("namesPartByColor.parquet")

Bucketing

Bucketing is applicable only to persistent tables:
# Bucket and sort data
df.write \
    .bucketBy(42, "name") \
    .sortBy("age") \
    .saveAsTable("people_bucketed")
SQL
CREATE TABLE people_bucketed
USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS
AS SELECT * FROM people;

Combined Partitioning and Bucketing

df.write \
    .partitionBy("favorite_color") \
    .bucketBy(42, "name") \
    .saveAsTable("users_partitioned_bucketed")
Partitioning creates a directory structure and is limited to low-cardinality columns. Bucketing distributes data across a fixed number of buckets and works well with high-cardinality columns.

Supported Data Formats

Spark SQL supports these built-in data sources:

Parquet

Columnar format with automatic schema preservation

ORC

Optimized Row Columnar format

JSON

Line-delimited JSON files

CSV

Comma-separated values with header support

Text

Plain text files

Avro

Binary format with schema evolution

Protobuf

Protocol buffer format

JDBC

Relational databases via JDBC

Next Steps

Performance Tuning

Learn to optimize your data source operations

Distributed SQL Engine

Use Spark as a distributed query engine

Build docs developers (and LLMs) love