(转)GraphX源码解析(Graph构建过程)

0. Graph构建

Graph对象是用户的操作入口,主要包含edge和vertex两部分。边是由点组成,所以边中所有的点就是点的全集,但这个全集包含了重复的点,去重后就是VertexRDD。

1. 构建图的方法

从边的集合构建图(Graph.fromEdges)

def fromEdges[VD: ClassTag, ED: ClassTag](
      edges: RDD[Edge[ED]],
      defaultValue: VD,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

从源点和目的点的元组构建(Graph.fromEdgeTuples)

  def fromEdgeTuples[VD: ClassTag](
      rawEdges: RDD[(VertexId, VertexId)],
      defaultValue: VD,
      uniqueEdges: Option[PartitionStrategy] = None,
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

从具有属性的顶点和边的RDD构建(Graph())

 def apply[VD: ClassTag, ED: ClassTag](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null.asInstanceOf[VD],
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

三种方法最后都是调用了伴生对象GraphImpl的apply()方法,主要包括edgeRDD和vertexRDD 的构建,vertexRDD是从edgeRDD基础上构建起来的。

  def apply[VD: ClassTag, ED: ClassTag](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD,
      edgeStorageLevel: StorageLevel,
      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
    val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
      .withTargetStorageLevel(edgeStorageLevel)
    val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
      .withTargetStorageLevel(vertexStorageLevel)
    GraphImpl(vertexRDD, edgeRDD)
  }
  1. 构建EdgeRDD
    2.1 从HDFS加载文本文件
    从分布式文件系统(HDFS)中加载文本,按行处理成元组形式,即(srcId, dstId)。
    val rawEdgesRdd: RDD[(Long, Long)] = sc.textFile(input, partNum).repartition(partNum).map {
      case line =>
        val sd = line.split(",")
        val src = sd(0).toLong
        val dst = sd(1).toLong
    }.distinct()

数据格式如下:

0,1
2,3
4,1
5,1
8,2
3,5
...

2.2 详细构建过程

第一步:Graph.fromEdge(edges)

首先从已经构建好的RDD[Edge[ED]]来开始整个EdgeRDD的构建。Edge在文件Edge.scala中定义,主要存储了边的三种类型数据:srcId, dstId, attr。

case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
    var srcId: VertexId = 0,
    var dstId: VertexId = 0,
    var attr: ED = null.asInstanceOf[ED])
  extends Serializable

第二步:EdgeRDD.fromEdges(edges)

遍历RDD[Edge[ED]]的所有分区,开始重新构建边的存储方式。

第三步:EdgePartitionBuilder[ED, VD]

EdgePartitionBuilder是边的物理存储结构,具体存储结构的关系图如下:

(勘误:localDstIds表中最后一行数据的local值为4,应该修改为5)

源码如下:

private[graphx]
class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
    size: Int = 64) {
  private[this] val edges = new PrimitiveVector[Edge[ED]](size)

  /* 将一条边加入进去*/
  def add(src: VertexId, dst: VertexId, d: ED) {
    edges += Edge(src, dst, d)
  }

  // 上述add执行完成后,会调用下面的toEdgePartition方法形成EdgePartition
  // 下面就是GraphX中图数据在分区内部的存储结构
  def toEdgePartition: EdgePartition[ED, VD] = {
    val edgeArray = edges.trim().array
    new Sorter(Edge.edgeArraySortDataFormat[ED])
      .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering) // 将图进行快速排序,先按源点排,再按照目的点排
    val localSrcIds = new Array[Int](edgeArray.size)
    val localDstIds = new Array[Int](edgeArray.size)
    val data = new Array[ED](edgeArray.size)  // 存储权值
    val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]  // 保存相同srcId的第一个索引值
    val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
    val local2global = new PrimitiveVector[VertexId] // 记录源点和所有目的点
    var vertexAttrs = Array.empty[VD]  // 顶点属性

    // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
    // adding them to the index. Also populate a map from vertex id to a sequential local offset.

    // 构建边结构
    if (edgeArray.length > 0) {
      index.update(edgeArray(0).srcId, 0)
      var currSrcId: VertexId = edgeArray(0).srcId
      var currLocalId = -1
      var i = 0
      while (i < edgeArray.size) {
        val srcId = edgeArray(i).srcId  // 获取第i个点的src
        val dstId = edgeArray(i).dstId  // 获取第i个点的dst

        // 序号是递增
        // chageValue方法:若srcId不存在,则执行大括号中的内容,并将currLocalId作为global2local的value
        // local2global 只会记录一次源点
        // loaclSrcIds 中记录是源点在global2local中存的索引值,即currLocalId的结果
        localSrcIds(i) = global2local.changeValue(srcId,
          { currLocalId += 1; local2global += srcId; currLocalId }, identity) // identity相同

        // 序号是递增
        // 将目的点ID和currLocalId的值存储到global2local中
        // 并同时更新localDstIds对应的存储结果
        localDstIds(i) = global2local.changeValue(dstId,
          { currLocalId += 1; local2global += dstId; currLocalId }, identity)


        // 序号是递增
        data(i) = edgeArray(i).attr  // 存储第i个点的属性值

        // index中记录某个源点ID第一次出现的下标
        if (srcId != currSrcId) {
          currSrcId = srcId
          index.update(currSrcId, i)
        }

        i += 1
      }
      vertexAttrs = new Array[VD](currLocalId + 1)
    }

    new EdgePartition(
      localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,
      None)
  }
}

第四步:toEdgePartition

分区内将图进行快速排序,先按源点排序,再按照目的点排序,new Sorter(Edge.edgeArraySortDataFormat[ED]).sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)。关于为什么要排序的原因,因为顶点的存储使用数组,数据是连续的内存空间,顺序访问时,访问速度更快。

内部存储主要有如下7个数据结构,下面由简到难依次介绍。
(1)data:存储当前分区所有边的attr的属性数组。

(2)vertexAttrs:用来存储顶点的数组,toEdgePartition后为空。

(3)index:相同srcId的第一次出现的srcId和其下标。

(4、5)localSrcIds/loacalDstIds:是glocal2local.changeValue()返回的一个本地索引,这里实际的顶点的ID称为global,对应的索引称为local。

(6)global2local:是spark私有的Map数据结构GraphXPrimitiveKeyOpenHashMap,保存vertextId和本地索引的映射关系。其中包含当前partition中所有srcId、dstId与本地索引的映射关系。

(7)localg2lobal:记录的是所有的VertexId的数组。其中会包含相同的ID。即:当前分区所有vertextId的顺序实际值。

用途:

根据本地下标取VertexId
localSrcIds/localDstIds -> index -> local2global -> VertexId

根据VertexId取本地下标,取属性
VertexId -> global2local -> index -> data -> attr object

3. 构建VertexRDD

第一步:VertexRDD.fromEdges()

构建VertexRDD入口是:val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size,defaultVertexAttr).withTargetStorageLevel(vertexStorageLevel),点是以EdgeRDD[ED, VD]为基础来构建的。为了能通过点找到边,每个点都需要保存点所在边的信息即分区ID(pid),这些信息保存在路由表RoutingTablePartition中。

物理存储结构如下所示:


image.png

第二步:RoutingTablePartition.edgePartitionToMsgs

该方法返回RoutingTableMessage类型的迭代器,对应的数据类型是包含vid和int的tuple类型:(VertexId, Int),为了节省内存,把edgePartitionId和一个标志位通过一个32位的int表示。int的32~31位表示一个标志位,01: isSrcId,10: isDstId。30~0位表示边分区ID。

val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(Function.tupled(RoutingTablePartition.edgePartitionToMsgs))).setName("VertexRDD.createRoutingTables - vid2pid (aggregation)")

第三步:RoutingTablePartition.fromMsgs

(1)将上面生成的消息路由表信息进行重新分区,分区数保持和edge的分区数一致。

val numEdgePartitions = edges.partitions.size
vid2pid.partitionBy(vertexPartitioner).mapPartitions(iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)), preservesPartitioning = true)

(2)在新分区中,mapPartition的数据,从RoutingTableMessage解出数据:vid,edge pid,isSrcId/isDstId。这个三个数据项重新封装到三个数据结构中:pid2vid,srcFlags,dstFlags。

(3)ShippableVertexPartition

根据上面routingTables,重新封装路由表里的数据结构为:ShippableVertexPartition。ShippableVertexPartition会合并相同重复点的属性attr对象,补全缺失的attr对象。得到的对象是ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable),包括keyset,values和routingTable。

(4)new VertexRDDImpl()
创建完对象后会生成VertexRDD。

  1. 生成Graph对象
    把上述edgeRDD和vertexRDD拿过来组成Graph
new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350