Skip to main content
This guide walks you through the fundamentals of working with Spark SQL and DataFrames.

Starting Point: SparkSession

The entry point into all Spark SQL functionality is the SparkSession class. Create a basic SparkSession to get started:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
SparkSession provides built-in support for Hive features including HiveQL, Hive UDFs, and reading from Hive tables. You don’t need an existing Hive setup to use these features.

Creating DataFrames

You can create DataFrames from various sources:

From JSON Files

# Read JSON file
df = spark.read.json("examples/src/main/resources/people.json")

# Display the content
df.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

From Other Sources

DataFrames can be created from:
  • Existing RDDs
  • Hive tables
  • Various data sources (Parquet, CSV, JDBC, etc.)

DataFrame Operations

DataFrames provide a domain-specific language for structured data manipulation:

Basic Operations

# Print the schema
df.printSchema()
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
# +-------+
# |   name|
# +-------+
# |Michael|
# |   Andy|
# | Justin|
# +-------+

# Select and transform columns
df.select(df['name'], df['age'] + 1).show()
# +-------+---------+
# |   name|(age + 1)|
# +-------+---------+
# |Michael|     null|
# |   Andy|       31|
# | Justin|       20|
# +-------+---------+

# Filter rows
df.filter(df['age'] > 21).show()
# +---+----+
# |age|name|
# +---+----+
# | 30|Andy|
# +---+----+

# Group and aggregate
df.groupBy("age").count().show()
# +----+-----+
# | age|count|
# +----+-----+
# |  19|    1|
# |null|    1|
# |  30|    1|
# +----+-----+
In Python, access DataFrame columns by attribute (df.age) or by indexing (df['age']). The indexing form is future-proof and won’t break with column names that are also DataFrame attributes.

Running SQL Queries

You can run SQL queries programmatically by registering DataFrames as temporary views:
# Register the DataFrame as a temporary view
df.createOrReplaceTempView("people")

# Run SQL query
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
# +----+-------+
# | age|   name|
# +----+-------+
# |null|Michael|
# |  30|   Andy|
# |  19| Justin|
# +----+-------+

# Complex SQL query
results = spark.sql("""
    SELECT name, age 
    FROM people 
    WHERE age > 21 
    ORDER BY age DESC
""")
results.show()

Global Temporary Views

Temporary views are session-scoped and disappear when the session terminates. For views shared across all sessions, create a global temporary view:
# Register as global temporary view
df.createGlobalTempView("people")

# Global temporary view is in the global_temp database
spark.sql("SELECT * FROM global_temp.people").show()

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
Global temporary views are tied to the system-preserved database global_temp and kept alive until the Spark application terminates.

Creating Datasets

Datasets are available in Scala and Java only. Python and R use DataFrames due to their dynamic nature.
Datasets use specialized encoders to serialize objects for processing:
// Define a case class
case class Person(name: String, age: Long)

// Create Dataset from sequence
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()

// Create Dataset from primitive types
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect()

Interoperating with RDDs

You can convert RDDs to DataFrames using two methods:

Inferring Schema Using Reflection

Spark SQL can infer the schema from the RDD structure:
from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert to RDD of Row objects
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer schema and create DataFrame
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# Run SQL queries
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
teenagers.show()

Programmatically Specifying Schema

When schema isn’t known until runtime, you can create it programmatically:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

sc = spark.sparkContext

# Load text file and convert to RDD of tuples
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))

# Define schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Apply schema to RDD
schemaPeople = spark.createDataFrame(people, schema)
schemaPeople.createOrReplaceTempView("people")

results = spark.sql("SELECT name FROM people")
results.show()

Next Steps

Data Sources

Learn to work with different file formats and databases

Performance Tuning

Optimize your Spark SQL queries for better performance

Build docs developers (and LLMs) love