Bootstrap 逻辑
入口代码:go-libp2p-kad-dht/dht_bootstrap.go
入口方法:
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
最终这个方法会去执行 runBootstrap 方法,真正的操作是从这里开始的
- func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
原理大概是,生成一个随机的目标 randomID ,然后在网络中 find 这个 peer ,因为肯定是找不到的,所以每个 peer 都会返回给你离这个 randomID 最近的 peers ,也就是 CloserPeers 来填充你本地的 k 桶,这样的动作默认是 5 分钟执行一次的,所以 k 桶很快就会被填满。
- func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error)
这个方法会现在本地找:dht.FindLocal(id)
本地没有就在路由表里找peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
这个方法的逻辑是,AlphaValue = 3 ,表示最低要确保发送到 3 个 peers 上
目标 id 会被 xor 出一个桶,这个桶如果是空的,或者桶里不足 3 个 peer
就会从临近的桶里返回 peer ,如果桶里 peer 超过 AlphaValue 则按桶里的总数来
奇怪的是最后又只节选了 3 个距离最近的 peer ,为什么呢?
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
cpl := commonPrefixLen(id, rt.local)
rt.tabLock.RLock()
// Get bucket at cpl index or last bucket
var bucket *Bucket
if cpl >= len(rt.Buckets) {
cpl = len(rt.Buckets) - 1
}
bucket = rt.Buckets[cpl]
peerArr := make(peerSorterArr, 0, count)
//TODO 如果这里的数据比 peerArr 多,会重新 make peerArr
peerArr = copyPeersFromList(id, peerArr, bucket.list)
if len(peerArr) < count {
// In the case of an unusual split, one bucket may be short or empty.
// if this happens, search both surrounding buckets for nearby peers
if cpl > 0 {
plist := rt.Buckets[cpl-1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
if cpl < len(rt.Buckets)-1 {
plist := rt.Buckets[cpl+1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
}
rt.tabLock.RUnlock()
// Sort by distance to local peer
sort.Sort(peerArr)
// TODO 这里为什么又截断了呢?只取了3个离的最近的,什么目的?
if count < len(peerArr) {
peerArr = peerArr[:count]
}
out := make([]peer.ID, 0, len(peerArr))
for _, p := range peerArr {
out = append(out, p.p)
}
return out
}
在 query 的同时会刷新每个有响应的 peer 的 ttl ,但是至多只向 3 个 peer 发送 query 请求
请求的返回的 closer 貌似没处理,为什么?
因为需要耐心寻找,看了一圈终于在
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)
中找到了答案,这个实在是太隐蔽了,对代码的作者表示谴责
以下是 FindPeer 中的代码片段,
parent := ctx
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
return nil, err
}
closer := pmes.GetCloserPeers()
clpeerInfos := pb.PBPeersToPeerInfos(closer)
// see if we got the peer here
for _, npi := range clpeerInfos {
if npi.ID == id {
// add by liangc ---->
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.PeerFindby,
ID: p,
})
// add by liangc <----
return &dhtQueryResult{
peer: npi,
success: true,
}, nil
}
}
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: clpeerInfos,
})
return &dhtQueryResult{closerPeers: clpeerInfos}, nil
})
// run it!
result, err := query.Run(ctx, peers)
在上面的最后一句 query.Run 中,包含了如下调用轨迹
- func (q dhtQuery) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
- func (r dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
- func (r *dhtQueryRunner) spawnWorkers(proc process.Process)
- func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)
- func (r *dhtQueryRunner) spawnWorkers(proc process.Process)
- func (r dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
在 dhtQueryRunner.queryPeer中处理了closerPeers
} else if len(res.closerPeers) > 0 {
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
for _, next := range res.closerPeers {
if next.ID == r.query.dht.self { // don't add self.
log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
continue
}
// add their addresses to the dialer's peerstore
r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
r.addPeerToQuery(next.ID)
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
}
} else {
log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
}
可以看到,每个 closer 都被放到了 peerstore 中,同时也放到了 peersSeen 中,
其实这里就是在迭代 peersSeen 来执行 run 方法,peersSeen 是线程安全的 set 类型
这就保证虽然所有的 run 都是在线程中执行的,但是主程序轮训时并不会错过 set 中的新成员
同时 set 还可以保证重复的 peer 不会被重复添加,这就可以断定 set 不会无休止增加
其实 bootstrap 到此就没什么可看的了