PySpark is the Python API for Apache Spark, allowing you to harness the power of distributed computing with Python’s simplicity.
API Documentation
Access the complete PySpark API documentation at:
Spark Python API (Sphinx)
Main Modules
pyspark.sql
The core module for working with structured data in PySpark.
Key Classes:
- SparkSession - Entry point for Spark functionality
- DataFrame - Distributed collection of data organized into named columns
- Column - Column expression in a DataFrame
- Row - Row of data in a DataFrame
- functions - Built-in functions for DataFrame operations
pyspark.sql.types
Data types for defining schemas.
Key Classes:
- StructType - Schema definition
- StructField - Field in a schema
- Data types: StringType, IntegerType, DoubleType, DateType, TimestampType, etc.
pyspark.sql.streaming
Streaming API for real-time data processing.
Key Classes:
pyspark.ml
Machine learning library built on DataFrames.
Key Modules:
pyspark.pandas
Pandas API on Spark for seamless transition from pandas.
Key Features:
- Drop-in replacement for pandas operations
- Distributed computing on large datasets
- Compatible with most pandas APIs
Quick Start Example
Here’s a simple example to get started with PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, desc
# Create SparkSession
spark = SparkSession.builder \
.appName("PySpark Example") \
.master("local[*]") \
.getOrCreate()
# Create a DataFrame
data = [
("Alice", 25, "Engineering"),
("Bob", 30, "Sales"),
("Charlie", 35, "Engineering")
]
df = spark.createDataFrame(data, ["name", "age", "department"])
# Perform transformations
result = df \
.filter(col("age") > 25) \
.groupBy("department") \
.agg(avg("age").alias("avg_age")) \
.orderBy(desc("avg_age"))
# Show results
result.show()
# Stop the session
spark.stop()
Reading Data
Read data from various sources:
# CSV
df = spark.read.csv("data/people.csv", header=True, inferSchema=True)
# JSON
df = spark.read.json("data/people.json")
# Parquet
df = spark.read.parquet("data/people.parquet")
# JDBC
df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost/test") \
.option("dbtable", "people") \
.option("user", "username") \
.option("password", "password") \
.load()
DataFrame Operations
Common DataFrame transformations:
from pyspark.sql.functions import col, lit, when, concat
# Select columns
df.select("name", "age")
# Filter rows
df.filter(col("age") > 25)
df.where(df.age > 25) # Alternative syntax
# Add/modify columns
df.withColumn("age_plus_10", col("age") + 10)
df.withColumnRenamed("name", "full_name")
# Conditional logic
df.withColumn(
"age_group",
when(col("age") < 18, "minor")
.when(col("age") < 65, "adult")
.otherwise("senior")
)
# Aggregations
df.groupBy("department") \
.agg({
"age": "avg",
"name": "count"
})
# Joins
df1.join(df2, df1.id == df2.id, "inner")
User-Defined Functions (UDFs)
Create custom functions:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# Define a UDF
def categorize_age(age):
if age < 18:
return "minor"
elif age < 65:
return "adult"
else:
return "senior"
categorize_udf = udf(categorize_age, StringType())
# Use the UDF
df_with_category = df.withColumn(
"age_category",
categorize_udf(col("age"))
)
SQL Queries
Execute SQL queries on DataFrames:
# Register DataFrame as temporary view
df.createOrReplaceTempView("people")
# Run SQL query
result = spark.sql("""
SELECT department, AVG(age) as avg_age
FROM people
WHERE age > 25
GROUP BY department
ORDER BY avg_age DESC
""")
result.show()
Pandas API on Spark
Use familiar pandas syntax with Spark’s distributed computing:
import pyspark.pandas as ps
# Create a pandas-on-Spark DataFrame
psdf = ps.DataFrame({
'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35],
'department': ['Engineering', 'Sales', 'Engineering']
})
# Use pandas-like operations
filtered = psdf[psdf.age > 25]
grouped = psdf.groupby('department')['age'].mean()
# Convert to Spark DataFrame
spark_df = psdf.to_spark()
# Convert from Spark DataFrame
psdf = ps.DataFrame(spark_df)
Installation
Install PySpark using pip:
For Spark Connect (client-only):
pip install pyspark-client
Spark Connect Support
Since Spark 3.4, most PySpark APIs are supported in Spark Connect, including DataFrame, functions, and Column. However, SparkContext and RDD are not supported.
Connect to a remote Spark cluster:
from pyspark.sql import SparkSession
# Connect via Spark Connect
spark = SparkSession.builder \
.remote("sc://localhost") \
.getOrCreate()
# Or use environment variable
import os
os.environ['SPARK_REMOTE'] = "sc://localhost"
spark = SparkSession.builder.getOrCreate()
- Use built-in functions instead of UDFs when possible
- Cache DataFrames that are reused multiple times
- Use broadcast joins for small tables
- Partition data appropriately for your workload
- Use columnar formats like Parquet for better performance
# Cache a DataFrame
df.cache()
# Broadcast join
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), "id")
# Repartition
df.repartition(10)
Additional Resources
Always check the API reference documentation for the “Supports Spark Connect” label to verify compatibility when using Spark Connect.