

Spark Graphx,它是用于图和分布式图(graph-parallel)的计算。Graphx通过引入弹性分布式图属性:顶点和边均有属性的有向多重图,来扩展Spark RDD。为了支持图计算,GraphX开发了一组基本的功能操作以及一个优化过的Pregel API。另外,GraphX包含了一个快速增长的图算法和图builders的集合,用以简化图分析任务。


Graphx使用的是Vertex-Cut( 点分割 ) 方式存储图,使用的是点分割方式存储图。这种存储方式特点是任何一条边只会出现在一台机器上,每个点有可能分布到不同的机器上,当点被分割到不同机器上时,是相同的镜像,但是有一个点作为主点,其他的点作为虚点,当点的数据发生变化时,先更新主点的数据,然后将所有更新好的数据发送到虚点所在的所有机器,更新虚点。

VertexTable(id, data):id为顶点iddata为顶点属性

EdgeTable(pid, src, dst, data)pid为分区idsrc为源顶点iddst为目的顶点iddata为边属性

RoutingTable(id, pid)id为顶点idpid为分区id





  • 根据边构建图(Graph.fromEdges)
   * Construct a graph from a collection of edges.
   * @param edges the RDD containing the set of edges in the graph
   * @param defaultValue the default vertex attribute to use for each vertex
   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
   * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
   * @return a graph with edge attributes described by `edges` and vertices
   *         given by all vertices in `edges` with value `defaultValue`
def fromEdges[VD: ClassTag, ED: ClassTag](
      edges: RDD[Edge[ED]],
      defaultValue: VD,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
    GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
  • 根据边的两个顶点数据构建(Graph.fromEdgeTuples)
   * Construct a graph from a collection of edges encoded as vertex id pairs.
   * @param rawEdges a collection of edges in (src, dst) form
   * @param defaultValue the vertex attributes with which to create vertices referenced by the edges
   * @param uniqueEdges if multiple identical edges are found they are combined and the edge
   * attribute is set to the sum.  Otherwise duplicate edges are treated as separate. To enable
   * `uniqueEdges`, a [[PartitionStrategy]] must be provided.
   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
   * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
   * @return a graph with edge attributes containing either the count of duplicate edges or 1
   * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
 def fromEdgeTuples[VD: ClassTag](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
    val edges = => Edge(p._1, p._2, 1))
    val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
    uniqueEdges match {
      case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
      case None => graph


   * Create a graph from edges, setting referenced vertices to `defaultVertexAttr`.
  def apply[VD: ClassTag, ED: ClassTag](
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD,
      edgeStorageLevel: StorageLevel,
      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
    fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)

   * Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`.
  def apply[VD: ClassTag, ED: ClassTag](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD,
      edgeStorageLevel: StorageLevel,
      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
    val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
    val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
    GraphImpl(vertexRDD, edgeRDD)

   * Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The
   * VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
   * `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
  def apply[VD: ClassTag, ED: ClassTag](
      vertices: VertexRDD[VD],
      edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {


    // Convert the vertex partitions in edges to the correct type
    val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
      .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])

    GraphImpl.fromExistingRDDs(vertices, newEdges)




  • 从文件中加载信息,转换成tuple(srcId,dstId)
val rawEdgesRdd: RDD[(Long, Long)] = 
    sc.textFile(input).filter(s => s != "0,0").repartition(partitionNum).map {
      case line =>
        val ss = line.split(",")
        val src = ss(0).toLong
        val dst = ss(1).toLong
        if (src < dst)
          (src, dst)
          (dst, src)
  • 入口,调用Graph.fromEdgeTuples(rawEdgesRdd)
    源数据为分割的两个点ID,把源数据映射成Edge(srcId, dstId, attr)对象, attr默认为1。这样元数据就构建成了RDD[Edge[ED]],如下面的代码
val edges = => Edge(p._1, p._2, 1))
  • RDD[Edge[ED]]进一步转化成EdgeRDDImpl[ED, VD]
  val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
def apply[VD: ClassTag, ED: ClassTag](
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD,
      edgeStorageLevel: StorageLevel,
      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
    fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)

apply调用fromEdgeRDD之前,代码会调用EdgeRDD.fromEdges(edges)RDD[Edge[ED]]转化成EdgeRDDImpl[ED, VD]

def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
    val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
      val builder = new EdgePartitionBuilder[ED, VD]
      iter.foreach { e =>
        builder.add(e.srcId, e.dstId, e.attr)
      Iterator((pid, builder.toEdgePartition))


def toEdgePartition: EdgePartition[ED, VD] = {
    val edgeArray = edges.trim().array
    new Sorter(Edge.edgeArraySortDataFormat[ED])
      .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
    val localSrcIds = new Array[Int](edgeArray.length)
    val localDstIds = new Array[Int](edgeArray.length)
    val data = new Array[ED](edgeArray.length)
    val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
    val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
    val local2global = new PrimitiveVector[VertexId]
    var vertexAttrs = Array.empty[VD]
    // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
    // adding them to the index. Also populate a map from vertex id to a sequential local offset.
    if (edgeArray.length > 0) {
      index.update(edgeArray(0).srcId, 0)
      var currSrcId: VertexId = edgeArray(0).srcId
      var currLocalId = -1
      var i = 0
      while (i < edgeArray.length) {
        val srcId = edgeArray(i).srcId
        val dstId = edgeArray(i).dstId
        localSrcIds(i) = global2local.changeValue(srcId,
          { currLocalId += 1; local2global += srcId; currLocalId }, identity)
        localDstIds(i) = global2local.changeValue(dstId,
          { currLocalId += 1; local2global += dstId; currLocalId }, identity)
        data(i) = edgeArray(i).attr
        if (srcId != currSrcId) {
          currSrcId = srcId
          index.update(currSrcId, i)

        i += 1
      vertexAttrs = new Array[VD](currLocalId + 1)
    new EdgePartition(
      localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,

** toEdgePartition的第一步就是对边进行排序。

** toEdgePartition的第二步就是填充localSrcIds,localDstIds, data, index, global2local, local2global, vertexAttrs

global2local是一个简单的,key值非负的快速hash map:GraphXPrimitiveKeyOpenHashMap, 保存vertextId和本地索引的映射关系。global2local中包含当前partition所有srcId、dstId与本地索引的映射关系。


我们知道相同的srcId可能对应不同的dstId。按照srcId排序之后,相同的srcId会出现多行,如上图中的index desc部分。index中记录的是相同srcId中第一个出现的srcId与其下标。



// 根据本地下标取VertexId
localSrcIds/localDstIds -> index -> local2global -> VertexId
// 根据VertexId取本地下标,取属性
VertexId -> global2local -> index -> data -> attr object





def fromEdges[VD: ClassTag](
      edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
    //1 创建路由表
    val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
    //2 根据路由表生成分区对象vertexPartitions
    val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
      val routingTable =
        if (routingTableIter.hasNext) else RoutingTablePartition.empty
      Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
    }, preservesPartitioning = true)
    //3 创建VertexRDDImpl对象
    new VertexRDDImpl(vertexPartitions)
  • 创建路由表
private[graphx] def createRoutingTables(
      edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
    // 将edge partition中的数据转换成RoutingTableMessage类型,
    val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(


def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
    : Iterator[RoutingTableMessage] = {
    val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
    edgePartition.iterator.foreach { e =>
      map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
      map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
    } { vidAndPosition =>
      val vid = vidAndPosition._1
      val position = vidAndPosition._2
      toMessage(vid, pid, position)
private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage = {
    val positionUpper2 = position << 30
    val pidLower30 = pid & 0x3FFFFFFF
    (vid, positionUpper2 | pidLower30)

** 根据路由表生成分区对象

private[graphx] def createRoutingTables(
      edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
    // 将edge partition中的数据转换成RoutingTableMessage类型,
    val numEdgePartitions = edges.partitions.size
      iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
      preservesPartitioning = true)


 def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
    : RoutingTablePartition = {
    val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
    val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
    for (msg <- iter) {
      val vid = vidFromMessage(msg)
      val pid = pidFromMessage(msg)
      val position = positionFromMessage(msg)
      pid2vid(pid) += vid
      srcFlags(pid) += (position & 0x1) != 0
      dstFlags(pid) += (position & 0x2) != 0
    new RoutingTablePartition( {
      case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))

该方法从RoutingTableMessage获取数据,将vid, 边pid, isSrcId/isDstId重新封装到pid2vidrcFlagsdstFlags这三个数据结构中。它们表示当前顶点分区中的点在边分区的分布。
想象一下,重新分区后,新分区中的点可能来自于不同的边分区,所以一个点要找到边,就需要先确定边的分区号pid, 然后在确定的边分区中确定是srcId还是dstId, 这样就找到了边。
新分区中保存vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid))这样的记录。这里转换为toBitSet保存是为了节省空间。


def apply[VD: ClassTag](
      iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
      mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
    val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
    // 合并顶点
    iter.foreach { pair =>
      map.setMerge(pair._1, pair._2, mergeFunc)
    // 不全缺失的属性值
    routingTable.iterator.foreach { vid =>
      map.changeValue(vid, defaultVal, identity)
    new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
ShippableVertexPartition[VD: ClassTag](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet,
val routingTable: RoutingTablePartition)



使用edgeRDDvertexRDD的方式构建,使用 new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]])) 就可以生成Graph对象。

