Skip to main content
GraphX is Apache Spark’s component for graphs and graph-parallel computation. It extends the RDD abstraction by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge.

Overview

GraphX provides you with:
  • A property graph abstraction for directed multigraphs
  • Fundamental graph operators (e.g., subgraph, joinVertices, aggregateMessages)
  • An optimized variant of the Pregel API for iterative graph computation
  • Built-in graph algorithms (PageRank, Connected Components, Triangle Counting)
  • Graph builders for constructing graphs from various sources

Getting Started

To use GraphX, you need to import the necessary packages:
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
If you’re not using the Spark shell, you’ll also need to create a SparkContext to get started.

The Property Graph

The property graph is a directed multigraph with user-defined objects attached to each vertex and edge. A directed multigraph can have multiple parallel edges sharing the same source and destination vertex.

Graph Structure

Each vertex is keyed by a unique 64-bit long identifier (VertexId). GraphX does not impose any ordering constraints on vertex identifiers. Similarly, edges have corresponding source and destination vertex identifiers. The property graph is parameterized over the vertex (VD) and edge (ED) types:
class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
}
GraphX optimizes the representation of vertex and edge types when they are primitive data types (e.g., int, double), reducing memory footprint by storing them in specialized arrays.

Creating a Property Graph

You can construct a graph from collections of RDDs. Here’s an example creating a social network graph:
// Assume the SparkContext has already been constructed
val sc: SparkContext

// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))

// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Seq(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))

// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")

// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
The resulting graph has the type signature:
val userGraph: Graph[(String, String), String]

Working with Vertices and Edges

You can access and filter vertices and edges using standard RDD operations:
val graph: Graph[(String, String), String] // Constructed from above

// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count

// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count

Using the Triplet View

The triplet view logically joins vertex and edge properties, yielding an RDD[EdgeTriplet[VD, ED]]:
val graph: Graph[(String, String), String] // Constructed from above

// Use the triplets view to create an RDD of facts
val facts: RDD[String] =
  graph.triplets.map(triplet =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
    
facts.collect.foreach(println(_))

Graph Operators

GraphX provides a collection of operators for transforming and querying graphs. Like RDDs, property graphs are immutable and operations produce new graphs with transformed properties and structure.

Property Operators

Transform vertex or edge properties while preserving graph structure:
class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
Always use mapVertices instead of manually mapping the vertices RDD. This preserves the structural indices and enables GraphX optimizations.
Example initializing a graph for PageRank:
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
  graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))

// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
  inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)

Structural Operators

Modify the graph structure itself:
class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

Subgraph Example

Filter a graph to remove broken links:
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

// Remove missing vertices as well as the edges connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")

// The valid subgraph will disconnect users with missing information
validGraph.vertices.collect.foreach(println(_))

Join Operators

Join external data with graphs:
class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}
Example setting up a graph with out-degree properties:
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
  outDegOpt match {
    case Some(outDeg) => outDeg
    case None => 0 // No outDegree means zero outDegree
  }
}

Neighborhood Aggregation

A key operation in graph analytics is aggregating information about the neighborhood of each vertex. GraphX provides the aggregateMessages operator for this purpose.

Aggregate Messages

The aggregateMessages operator applies a user-defined sendMsg function to each edge triplet and uses mergeMsg to aggregate messages at their destination vertex:
class Graph[VD, ED] {
  def aggregateMessages[Msg: ClassTag](
      sendMsg: EdgeContext[VD, ED, Msg] => Unit,
      mergeMsg: (Msg, Msg) => Msg,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[Msg]
}
Example computing the average age of older followers:
// Create a graph with "age" as the vertex property
val graph: Graph[Double, Int] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )

// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
  triplet => { // Map Function
    if (triplet.srcAttr > triplet.dstAttr) {
      // Send message to destination vertex containing counter and age
      triplet.sendToDst((1, triplet.srcAttr))
    }
  },
  // Add counter and age
  (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)

// Divide total age by number of older followers to get average age
val avgAgeOfOlderFollowers: VertexRDD[Double] =
  olderFollowers.mapValues( (id, value) =>
    value match { case (count, totalAge) => totalAge / count } )
The aggregateMessages operation performs optimally when messages are constant-sized (e.g., floats and addition instead of lists and concatenation).

Computing Degree Information

You can easily compute in-degree, out-degree, and total degree:
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
  if (a._2 > b._2) a else b
}

// Compute the max degrees
val maxInDegree: (VertexId, Int)  = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int)   = graph.degrees.reduce(max)

Pregel API

GraphX exposes a variant of the Pregel API for iterative graph-parallel computation. The Pregel operator executes in a series of super steps where vertices receive messages, compute new values, and send messages to neighbors.

Pregel Signature

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED]
}

Single Source Shortest Path Example

Here’s how to implement SSSP using Pregel:
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
  GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)

val sourceId: VertexId = 42 // The ultimate source

// Initialize the graph such that all vertices except the root have distance infinity
val initialGraph = graph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity)

val sssp = initialGraph.pregel(Double.PositiveInfinity)(
  (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
  triplet => {  // Send Message
    if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
      Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    } else {
      Iterator.empty
    }
  },
  (a, b) => math.min(a, b) // Merge Message
)

println(sssp.vertices.collect().mkString("\n"))
To avoid stack overflow errors due to long lineage chains, Pregel supports periodic checkpointing. Set spark.graphx.pregel.checkpointInterval to a positive number (e.g., 10) and configure a checkpoint directory using SparkContext.setCheckpointDir.

Graph Algorithms

GraphX includes several built-in graph algorithms that you can use directly.

PageRank

PageRank measures the importance of each vertex, assuming edges represent endorsements:
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Run PageRank
val ranks = graph.pageRank(0.0001).vertices

// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}

val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}

println(ranksByUsername.collect().mkString("\n"))
GraphX provides both static and dynamic PageRank implementations:
  • Static PageRank runs for a fixed number of iterations
  • Dynamic PageRank runs until ranks converge (stop changing by more than a specified tolerance)

Connected Components

The connected components algorithm labels each connected component with the ID of its lowest-numbered vertex:
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Find the connected components
val cc = graph.connectedComponents().vertices

// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}

val ccByUsername = users.join(cc).map {
  case (id, (username, cc)) => (username, cc)
}

println(ccByUsername.collect().mkString("\n"))

Triangle Counting

Triangle counting determines the number of triangles passing through each vertex, providing a measure of clustering:
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
  .partitionBy(PartitionStrategy.RandomVertexCut)

// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices

// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}

val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
  (username, tc)
}

println(triCountByUsername.collect().mkString("\n"))
Triangle counting requires edges to be in canonical orientation (srcId < dstId) and the graph to be partitioned using Graph.partitionBy.

Graph Builders

GraphX provides several methods for constructing graphs from various sources.

From Edge List File

Load a graph from a list of edges on disk:
object GraphLoader {
  def edgeListFile(
      sc: SparkContext,
      path: String,
      canonicalOrientation: Boolean = false,
      minEdgePartitions: Int = 1)
    : Graph[Int, Int]
}
The file should contain adjacency lists of (source vertex ID, destination vertex ID) pairs:
# This is a comment
2 1
4 1
1 2

From RDDs

Create graphs from RDDs of vertices and edges:
// Create from vertices and edges RDDs
val graph = Graph(vertices, edges, defaultVertexAttr)

// Create from edges only (vertices are inferred)
val graph = Graph.fromEdges(edges, defaultValue)

// Create from edge tuples with automatic deduplication
val graph = Graph.fromEdgeTuples(rawEdges, defaultValue, 
  uniqueEdges = Some(PartitionStrategy.RandomVertexCut))
None of the graph builders repartition edges by default. Use Graph.partitionBy to repartition the graph if needed for operations like groupEdges.

Caching and Performance

When using a graph multiple times, you should cache it to avoid recomputation:
// Cache the graph for reuse
val cachedGraph = graph.cache()
For iterative computations, uncaching intermediate results improves performance:
For iterative computation, use the Pregel API, which correctly unpersists intermediate results automatically.

Comprehensive Example

Here’s a complete example that builds a graph, filters it, runs PageRank, and returns top users:
// Load user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
  .map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))

// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")

// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
  case (uid, deg, Some(attrList)) => attrList
  case (uid, deg, None) => Array.empty[String]
}

// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)

// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)

// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
  case (uid, attrList, Some(pr)) => (pr, attrList.toList)
  case (uid, attrList, None) => (0.0, attrList.toList)
}

println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))
This example demonstrates how you can chain multiple GraphX operations to perform complex graph analytics in just a few lines of code.

Build docs developers (and LLMs) love