全局顶点编码器
- 当空洞顶点(没有边的无效顶点)很多时,可以对顶点进行重新编码,比如有1,100这两个顶点,进行重新连续编码后为0,1。因为采样csr/csc结构在内存中存储图,所以说顶点数值范围变小后,构图就更加节省内存。
- 具体来说:在加载边文件后,对边的src,dst顶点进行全局编码,以减少顶点范围,然后再使用编完码的边进行构造图。
分布式全局编码流程
- 采样mpi通信实现分布式计算,每个计算节点在初始阶段,先对要加载的边文件进行均分,使每个计算节点都只加载部分的边文件,以增加整体加载速度,然后在对图进行分区(Edge-cut+ alpha)。而分布式全局编码是在每个计算节点加载完边数据后,在进行图分区之前做的事情。
分布式全局编码步骤:
0加载节点负责的文件,得到边数据,存放在edges buffer中
1每个计算节点将自己读取到的边顶点,根据shuffle规则发送到指定节点上,同时使用map_ids接收其它节点发送过来的顶点。
2节点将接收到存在map_ids中的顶点存到数组local_ids[]中,数组大小为map_ids中元素个数
3节点通过MPI_Allgather得到每个节点接收到的顶点数,,存在local_size[3]数组中
4通过MPI_Allreduce得到全局有效顶点个数
5通过MPI_Allreducer将全局有效顶点存到大数组global_ids中,数组大小为全局有效顶点数,此时每个顶点的所在数组下标就是顶点新的编码id.
6将global_ids中顶点存储id_map中,key=有效顶点, value=顶点在global_ids中数组下标, global_ids则每个计算节点上一直保存用于decode。
7遍历edges buffer使用id_map将边的顶点vid替换为编码后的顶点id.
8.当前图计算完成后,在输出结果时对id进行decode,通过global_ids[id]来得到id编码前的顶点vid.
shuffle规则
- 第一种(简单起见,案例使用方案): vid % nodes
- 第二种: murmur_hash2(vid, seed) % nodes
图解
分布式全局顶点编码器
其它
MPI_Allgatherv
函数讲解
MPI_Allgatherv
: 所有节点都接收所有顶点发送过来的信息,因为每个节点发送的信息个数可能不一样,所以在用接收数组recvbuf接收j节点发来的消息sendbuf时,要通过displ数组指明sendbuf存在recvbuf中的偏移位置。
- 下面代码实现就是每个计算节点将自己接收到的顶点local_ids发送给所有节点,同时用global_ids_接收所有节点发过来的数据,但是每个节点发送的local_ids数据量可能会不一样,所以使用displs来表明第j个计算节点数据存放在global_ids_中的偏移位置。为此需要在此之前计算好displs中的数值:存放计算节点中顶点数的累加和。
MPI_Allgatherv(
&local_ids[0], local_ids.size(), get_mpi_data_type<VID_T>(), &global_ids_[0],
&recvcounts[0], &displs[0], get_mpi_data_type<VID_T>(), MPI_COMM_WORLD);
if (0 == cluster_info.partition_id_) {
LOG(INFO) << "all gather cost: " << watch.show("t1") / 1000.0;
}
cuckoohash_map使用
- 在分布式全局编码中在1步中,每个节点遍历edgebuffer中的边,并根据shuffle规则将边上的顶点发送到对应节点上。此处每次发送完一个顶点要标记该顶点已经发过了,避免在后面的边中再次遇到这个顶点时重复发送。所以需要使用hash_map这种O(1)时间复杂度的结构来存放哪些顶点已经发送过了。在加上会使用多线程来操作,所以hash_map要支持并发读写。程序中使用了开源的cuckoohash_map:高性能压缩hashmap支持并发读写。(不知道和folly的ConcurrentHashMap比起来效果咋样,但是至少使用它比使用folly简单,folly依赖太多了包含内容太综合了)
- 使用案例局部代码:
#include "libcuckoo/cuckoohash_map.hh
using cuckoomap_t = cuckoohash_map<VID_T, vid_t, std::hash<VID_T>, std::equal_to<VID_T>, std::allocator<std::pair<const VID_T, vid_t> > >;
if (opts_.src_need_encode_) {
// src是否已经发送过了
bool upserted = used.upsert(edge->src_, [](vid_t&){}, 0);
if (upserted) {
// 规矩shuffle规则得到src要发送的节点
auto send_to = murmur_hash2(&(edge->src_), sizeof(VID_T)) % cluster_info.partitions_;
context.send(send_to, edge->src_);
}
}