1、Graphx介绍
Spark Graphx
,它是用于图和分布式图(graph-parallel
)的计算。Graphx
通过引入弹性分布式图属性:顶点和边均有属性的有向多重图,来扩展Spark RDD
。为了支持图计算,GraphX
开发了一组基本的功能操作以及一个优化过的Pregel API
。另外,GraphX
包含了一个快速增长的图算法和图builders
的集合,用以简化图分析任务。
GraphX
的核心抽象是弹性分布式属性图,它是一个有向多重图,带有连接到每个顶点和边的用户定义的对象。
有向多重图中多个并行的边共享相同的源和目的顶点。支持并行边的能力简化了建模场景,相同的顶点可能存在多种关系。每个顶点用一个唯一的64位长的标识符(VertexID
)作为key
。GraphX
并没有对顶点标识强加任何排序。同样,边拥有相应的源和目的顶点标识符。
2、Graphx的图存储模式
Graphx
使用的是Vertex-Cut
( 点分割 ) 方式存储图,使用的是点分割方式存储图。这种存储方式特点是任何一条边只会出现在一台机器上,每个点有可能分布到不同的机器上,当点被分割到不同机器上时,是相同的镜像,但是有一个点作为主点,其他的点作为虚点,当点的数据发生变化时,先更新主点的数据,然后将所有更新好的数据发送到虚点所在的所有机器,更新虚点。
用三个RDD
存储图数据信息:
VertexTable(id, data
):id
为顶点id
, data
为顶点属性
EdgeTable(pid, src, dst, data)
:pid
为分区id
,src
为源顶点id
,dst
为目的顶点id
,data
为边属性
RoutingTable(id, pid)
:id
为顶点id
,pid
为分区id
点分割存储实现如下图所示:
3、图的构建
构建图的源码
构建图的入口方法有两种,分别是根据边构建和根据边的两个顶点构建。
- 根据边构建图(Graph.fromEdges)
/**
* Construct a graph from a collection of edges.
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}
- 根据边的两个顶点数据构建(Graph.fromEdgeTuples)
/**
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src, dst) form
* @param defaultValue the vertex attributes with which to create vertices referenced by the edges
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* `uniqueEdges`, a [[PartitionStrategy]] must be provided.
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph
}
}
以上代码都是使用了graph
的实现类(graphimpl
)中的apply
方法
/**
* Create a graph from edges, setting referenced vertices to `defaultVertexAttr`.
*/
def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
/**
* Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`.
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
.withTargetStorageLevel(edgeStorageLevel)
val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
.withTargetStorageLevel(vertexStorageLevel)
GraphImpl(vertexRDD, edgeRDD)
}
/**
* Create a graph from a VertexRDD and an EdgeRDD with arbitrary replicated vertices. The
* VertexRDD must already be set up for efficient joins with the EdgeRDD by calling
* `VertexRDD.withEdges` or an appropriate VertexRDD constructor.
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: VertexRDD[VD],
edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
vertices.cache()
// Convert the vertex partitions in edges to the correct type
val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
.mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
.cache()
GraphImpl.fromExistingRDDs(vertices, newEdges)
}
构建图过程
构建图的过程主要分为三步:构建边EdgeRDD
、构建顶点VertexRDD
、生成Graph
对象。
构建边EdgeRDD
- 从文件中加载信息,转换成
tuple
。(srcId,dstId)
val rawEdgesRdd: RDD[(Long, Long)] =
sc.textFile(input).filter(s => s != "0,0").repartition(partitionNum).map {
case line =>
val ss = line.split(",")
val src = ss(0).toLong
val dst = ss(1).toLong
if (src < dst)
(src, dst)
else
(dst, src)
}.distinct()
- 入口,调用
Graph.fromEdgeTuples(rawEdgesRdd)
源数据为分割的两个点ID
,把源数据映射成Edge(srcId, dstId, attr)
对象,attr
默认为1。这样元数据就构建成了RDD[Edge[ED]]
,如下面的代码
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
- 将
RDD[Edge[ED]]
进一步转化成EdgeRDDImpl[ED, VD]
第二步构建完RDD[Edge[ED]]
之后,GraphX
通过调用GraphImpl
的apply
方法来构建Graph
。
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
def apply[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultVertexAttr: VD,
edgeStorageLevel: StorageLevel,
vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
在apply
调用fromEdgeRDD
之前,代码会调用EdgeRDD.fromEdges(edges)
将RDD[Edge[ED]]
转化成EdgeRDDImpl[ED, VD]
。
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
EdgeRDD.fromEdgePartitions(edgePartitions)
}
程序遍历RDD[Edge[ED]]
的每个分区,并调用EdgePartitionBuilder.builder.toEdgePartition
对分区内的边作相应的处理。
def toEdgePartition: EdgePartition[ED, VD] = {
val edgeArray = edges.trim().array
new Sorter(Edge.edgeArraySortDataFormat[ED])
.sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering)
val localSrcIds = new Array[Int](edgeArray.length)
val localDstIds = new Array[Int](edgeArray.length)
val data = new Array[ED](edgeArray.length)
val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
val global2local = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
val local2global = new PrimitiveVector[VertexId]
var vertexAttrs = Array.empty[VD]
// Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and
// adding them to the index. Also populate a map from vertex id to a sequential local offset.
if (edgeArray.length > 0) {
index.update(edgeArray(0).srcId, 0)
var currSrcId: VertexId = edgeArray(0).srcId
var currLocalId = -1
var i = 0
while (i < edgeArray.length) {
val srcId = edgeArray(i).srcId
val dstId = edgeArray(i).dstId
localSrcIds(i) = global2local.changeValue(srcId,
{ currLocalId += 1; local2global += srcId; currLocalId }, identity)
localDstIds(i) = global2local.changeValue(dstId,
{ currLocalId += 1; local2global += dstId; currLocalId }, identity)
data(i) = edgeArray(i).attr
if (srcId != currSrcId) {
currSrcId = srcId
index.update(currSrcId, i)
}
i += 1
}
vertexAttrs = new Array[VD](currLocalId + 1)
}
new EdgePartition(
localSrcIds, localDstIds, data, index, global2local, local2global.trim().array, vertexAttrs,
None)
}
** toEdgePartition
的第一步就是对边进行排序。
按照srcId
从小到大排序。排序是为了遍历时顺序访问,加快访问速度。采用数组而不是Map
,是因为数组是连续的内存单元,具有原子性,避免了Map
的hash
问题,访问速度快。
** toEdgePartition
的第二步就是填充localSrcIds,localDstIds, data, index, global2local, local2global, vertexAttrs
。
数组localSrcIds,localDstIds
中保存的是通过global2local.changeValue(srcId/dstId)
转换而成的分区本地索引。可以通过localSrcIds、localDstIds
数组中保存的索引位从local2global
中查到具体的VertexId
。
global2local
是一个简单的,key
值非负的快速hash map:GraphXPrimitiveKeyOpenHashMap
, 保存vertextId
和本地索引的映射关系。global2local
中包含当前partition
所有srcId、dstId
与本地索引的映射关系。
data
就是当前分区的attr
属性数组。
我们知道相同的srcId
可能对应不同的dstId
。按照srcId
排序之后,相同的srcId
会出现多行,如上图中的index desc
部分。index
中记录的是相同srcId
中第一个出现的srcId
与其下标。
local2global
记录的是所有的VertexId
信息的数组。形如:srcId,dstId,srcId,dstId,srcId,dstId,srcId,dstId
。其中会包含相同的srcId
。即:当前分区所有vertextId
的顺序实际值。
我们可以通过根据本地下标取VertexId
,也可以根据VertexId
取本地下标,取相应的属性。
// 根据本地下标取VertexId
localSrcIds/localDstIds -> index -> local2global -> VertexId
// 根据VertexId取本地下标,取属性
VertexId -> global2local -> index -> data -> attr object
构建顶点VertexRDD
构建顶点VertexRDD
的过程分为三步
GraphX
使用VertexRDD.fromEdges
构建顶点VertexRDD
,当然我们把边RDD
作为参数传入。
def fromEdges[VD: ClassTag](
edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = {
//1 创建路由表
val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions))
//2 根据路由表生成分区对象vertexPartitions
val vertexPartitions = routingTables.mapPartitions({ routingTableIter =>
val routingTable =
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal))
}, preservesPartitioning = true)
//3 创建VertexRDDImpl对象
new VertexRDDImpl(vertexPartitions)
}
- 创建路由表
为了能通过点找到边,每个点需要保存点到边的信息,这些信息保存在RoutingTablePartition
中。
private[graphx] def createRoutingTables(
edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
// 将edge partition中的数据转换成RoutingTableMessage类型,
val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap(
Function.tupled(RoutingTablePartition.edgePartitionToMsgs)))
}
首先将边分区中的数据转换成RoutingTableMessage
类型,即tuple(VertexId,Int)
类型。
def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _])
: Iterator[RoutingTableMessage] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
edgePartition.iterator.foreach { e =>
map.changeValue(e.srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
map.changeValue(e.dstId, 0x2, (b: Byte) => (b | 0x2).toByte)
}
map.iterator.map { vidAndPosition =>
val vid = vidAndPosition._1
val position = vidAndPosition._2
toMessage(vid, pid, position)
}
}
//`30-0`比特位表示边分区`ID`,`32-31`比特位表示标志位
private def toMessage(vid: VertexId, pid: PartitionID, position: Byte): RoutingTableMessage = {
val positionUpper2 = position << 30
val pidLower30 = pid & 0x3FFFFFFF
(vid, positionUpper2 | pidLower30)
}
** 根据路由表生成分区对象
private[graphx] def createRoutingTables(
edges: EdgeRDD[_], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = {
// 将edge partition中的数据转换成RoutingTableMessage类型,
val numEdgePartitions = edges.partitions.size
vid2pid.partitionBy(vertexPartitioner).mapPartitions(
iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)),
preservesPartitioning = true)
}
生成的vid2pid按照HashPartitioner重新分区。我们看看RoutingTablePartition.fromMsgs方法。
def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage])
: RoutingTablePartition = {
val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean])
for (msg <- iter) {
val vid = vidFromMessage(msg)
val pid = pidFromMessage(msg)
val position = positionFromMessage(msg)
pid2vid(pid) += vid
srcFlags(pid) += (position & 0x1) != 0
dstFlags(pid) += (position & 0x2) != 0
}
new RoutingTablePartition(pid2vid.zipWithIndex.map {
case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid)))
})
}
该方法从RoutingTableMessage
获取数据,将vid
, 边pid
, isSrcId/isDstId
重新封装到pid2vid
,rcFlags
,dstFlags
这三个数据结构中。它们表示当前顶点分区中的点在边分区的分布。
想象一下,重新分区后,新分区中的点可能来自于不同的边分区,所以一个点要找到边,就需要先确定边的分区号pid
, 然后在确定的边分区中确定是srcId
还是dstId
, 这样就找到了边。
新分区中保存vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid))
这样的记录。这里转换为toBitSet
保存是为了节省空间。
根据上文生成的routingTables
,重新封装路由表里的数据结构为ShippableVertexPartition
。ShippableVertexPartition
会合并相同重复点的属性attr
对象,补全缺失的attr
对象。
def apply[VD: ClassTag](
iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD,
mergeFunc: (VD, VD) => VD): ShippableVertexPartition[VD] = {
val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
// 合并顶点
iter.foreach { pair =>
map.setMerge(pair._1, pair._2, mergeFunc)
}
// 不全缺失的属性值
routingTable.iterator.foreach { vid =>
map.changeValue(vid, defaultVal, identity)
}
new ShippableVertexPartition(map.keySet, map._values, map.keySet.getBitSet, routingTable)
}
//ShippableVertexPartition定义
ShippableVertexPartition[VD: ClassTag](
val index: VertexIdToIndexMap,
val values: Array[VD],
val mask: BitSet,
val routingTable: RoutingTablePartition)
map
就是映射vertexId->attr
,index
就是顶点集合,values
就是顶点集对应的属性集,mask
指顶点集的BitSet
。
生成Graph对象
使用edgeRDD
和vertexRDD
的方式构建,使用 new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))
就可以生成Graph
对象。