Spark SQL provides a powerful SQL interface for querying structured data in Spark. You can use standard SQL syntax to interact with DataFrames, tables, and views.
SQL Function Reference
Access the complete built-in functions documentation:
Spark SQL Built-in Functions
This comprehensive reference includes:
- Aggregate functions
- String functions
- Date and timestamp functions
- Math functions
- Collection functions
- Window functions
- JSON functions
- And many more
SQL Syntax Reference
For detailed SQL syntax documentation, see the SQL Reference Guide.
Key Topics
Running SQL Queries
Execute SQL queries using the sql() method:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Run a SQL query
result = spark.sql("""
SELECT department, AVG(salary) as avg_salary
FROM employees
WHERE hire_date >= '2020-01-01'
GROUP BY department
ORDER BY avg_salary DESC
""")
result.show()
Common SQL Operations
SELECT Queries
-- Basic SELECT
SELECT name, age, department FROM employees;
-- With WHERE clause
SELECT name, salary
FROM employees
WHERE department = 'Engineering' AND salary > 100000;
-- With aggregations
SELECT department, COUNT(*) as emp_count, AVG(salary) as avg_salary
FROM employees
GROUP BY department;
-- With HAVING clause
SELECT department, AVG(salary) as avg_salary
FROM employees
GROUP BY department
HAVING AVG(salary) > 80000;
JOINs
-- INNER JOIN
SELECT e.name, e.salary, d.dept_name
FROM employees e
INNER JOIN departments d ON e.dept_id = d.id;
-- LEFT JOIN
SELECT e.name, d.dept_name
FROM employees e
LEFT JOIN departments d ON e.dept_id = d.id;
-- Multiple joins
SELECT e.name, d.dept_name, l.city
FROM employees e
INNER JOIN departments d ON e.dept_id = d.id
INNER JOIN locations l ON d.location_id = l.id;
Subqueries
-- Scalar subquery
SELECT name, salary
FROM employees
WHERE salary > (SELECT AVG(salary) FROM employees);
-- IN subquery
SELECT name FROM employees
WHERE dept_id IN (
SELECT id FROM departments WHERE location = 'NYC'
);
-- EXISTS subquery
SELECT d.dept_name
FROM departments d
WHERE EXISTS (
SELECT 1 FROM employees e WHERE e.dept_id = d.id
);
Window Functions
-- ROW_NUMBER
SELECT
name,
salary,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees;
-- Running total
SELECT
date,
amount,
SUM(amount) OVER (ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total
FROM transactions;
-- LAG and LEAD
SELECT
date,
revenue,
LAG(revenue, 1) OVER (ORDER BY date) as prev_revenue,
LEAD(revenue, 1) OVER (ORDER BY date) as next_revenue
FROM daily_sales;
CTEs (Common Table Expressions)
WITH high_earners AS (
SELECT * FROM employees WHERE salary > 100000
),
dept_stats AS (
SELECT department, AVG(salary) as avg_salary
FROM high_earners
GROUP BY department
)
SELECT * FROM dept_stats WHERE avg_salary > 120000;
DDL Operations
Creating Tables
-- Create table
CREATE TABLE employees (
id INT,
name STRING,
age INT,
department STRING,
salary DOUBLE
)
USING parquet
PARTITIONED BY (department);
-- Create table from query
CREATE TABLE high_earners AS
SELECT * FROM employees WHERE salary > 100000;
-- Create external table
CREATE EXTERNAL TABLE external_data (
col1 STRING,
col2 INT
)
LOCATION 's3://bucket/path/';
Managing Tables
-- Show tables
SHOW TABLES;
SHOW TABLES IN database_name;
-- Describe table
DESCRIBE employees;
DESCRIBE EXTENDED employees;
-- Drop table
DROP TABLE IF EXISTS employees;
-- Truncate table
TRUNCATE TABLE employees;
Views
-- Create view
CREATE VIEW high_salary_employees AS
SELECT * FROM employees WHERE salary > 100000;
-- Create temporary view
CREATE TEMPORARY VIEW temp_employees AS
SELECT * FROM employees WHERE hire_date >= '2023-01-01';
-- Drop view
DROP VIEW IF EXISTS high_salary_employees;
DML Operations
INSERT
-- Insert values
INSERT INTO employees VALUES
(1, 'Alice', 30, 'Engineering', 120000),
(2, 'Bob', 35, 'Sales', 90000);
-- Insert from query
INSERT INTO high_earners
SELECT * FROM employees WHERE salary > 100000;
-- Insert with column specification
INSERT INTO employees (id, name, salary)
VALUES (3, 'Charlie', 95000);
UPDATE (Supported in Delta Lake)
UPDATE employees
SET salary = salary * 1.1
WHERE department = 'Engineering';
DELETE (Supported in Delta Lake)
DELETE FROM employees
WHERE hire_date < '2020-01-01';
Built-in Functions
String Functions
SELECT
UPPER(name) as upper_name,
LOWER(name) as lower_name,
CONCAT(first_name, ' ', last_name) as full_name,
SUBSTRING(name, 1, 3) as name_prefix,
LENGTH(name) as name_length,
TRIM(name) as trimmed_name
FROM users;
Date and Time Functions
SELECT
CURRENT_DATE() as today,
CURRENT_TIMESTAMP() as now,
DATE_ADD(hire_date, 90) as probation_end,
DATEDIFF(CURRENT_DATE(), hire_date) as days_employed,
YEAR(hire_date) as hire_year,
MONTH(hire_date) as hire_month,
DATE_FORMAT(hire_date, 'yyyy-MM') as hire_month_formatted
FROM employees;
Aggregate Functions
SELECT
COUNT(*) as total_count,
COUNT(DISTINCT department) as dept_count,
SUM(salary) as total_salary,
AVG(salary) as avg_salary,
MIN(salary) as min_salary,
MAX(salary) as max_salary,
STDDEV(salary) as salary_stddev
FROM employees;
Collection Functions
SELECT
ARRAY_CONTAINS(skills, 'Python') as has_python,
SIZE(skills) as skill_count,
EXPLODE(skills) as individual_skill,
ARRAY_JOIN(skills, ', ') as skills_list
FROM employees;
JSON Functions
SELECT
GET_JSON_OBJECT(metadata, '$.age') as age,
JSON_TUPLE(metadata, 'name', 'city') as (name, city)
FROM users;
Use Partitioning
CREATE TABLE sales (
id INT,
amount DOUBLE,
date DATE
)
USING parquet
PARTITIONED BY (year INT, month INT);
Use Bucketing
CREATE TABLE customers (
id INT,
name STRING,
email STRING
)
USING parquet
CLUSTERED BY (id) INTO 10 BUCKETS;
Optimize Joins
-- Use broadcast hints for small tables
SELECT /*+ BROADCAST(d) */ e.name, d.dept_name
FROM employees e
JOIN departments d ON e.dept_id = d.id;
Configuration
Set SQL configurations:
-- Set configuration
SET spark.sql.shuffle.partitions = 200;
-- View configuration
SET spark.sql.shuffle.partitions;
-- Reset configuration
RESET spark.sql.shuffle.partitions;
SQL CLI
Run SQL queries from the command line:
# Start Spark SQL CLI
./bin/spark-sql
# Run SQL file
./bin/spark-sql -f queries.sql
# Run inline query
./bin/spark-sql -e "SELECT * FROM employees LIMIT 10"
Additional Resources
Spark SQL is optimized by the Catalyst query optimizer, which automatically optimizes your queries for better performance.