eth/handle.go
中的ProtocolManager
管理节点之间通信。节点与节点之间的通信,也就是区块和交易的广播或同步。
这里先介绍广播。提及广播,要先说一个有趣的协议:gossip,对,就是流言蜚语。如果有关于明星的八卦或是负面新闻,不用多长时间,可能满大街的人们就都知道了。广播就类似于流言蜚语的传播,一传十,十传百的扩散出去,最后整个网络都知晓了。
以下是ProtocolManager
实现区块和交易的广播的流程图:
接下来会一步一步介绍。
0.索引
01.广播和同步的启动
02.区块广播
03.区块广播相关源码
04.交易广播以及源码
05.异步发送区块和交易的说明
06.总结
1.广播和同步的启动
区块和交易的广播与同步由ProtocolManager
协议管理控制。启动方法为Start
。
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
// 广播交易
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
// 广播区块
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
// 开始同步
go pm.syncer()
go pm.txsyncLoop()
}
开启了4个协程:
- 1.创建了新交易事件的通道,然后开始
pm.txBroadcastLoop()
广播交易。 - 2.创建了新区块事件的通道,然后开始
pm.minedBroadcastLoop()
广播区块。 - 3.同步区块,同步的过程在
eth/sync.go
里,下一次介绍。 - 4.同步交易。
先从区块广播开始。
2.区块广播
区块广播指的是矿工挖出新的区块后,将新区块告知并发给p2p网络中的所有节点。这里涉及到两个广播过程:
- 1.矿工广播新区块
- 2.其他的中继节点广播新区快
如下图:
第一轮:
黄色的节点表示矿工,矿工挖到区块后,接下来要将区块广播出去,也就是发送给相邻的节点,这里相邻的节点有5个,两个红色的节点和三个蓝色的节点。红色的节点表示收到区块的节点,蓝色的节点表示收到区块哈希的节点。
这里红色的节点是有一定数量要求的。取的是,要广播的节点数量的平方根。要广播5个节点,5取平方根再取整为2个。也就是说矿工向这两个红色节点直接发送了区块,然后向剩余的节点发送了区块哈希。
第二轮:
接收到区块哈希的蓝色节点向发来区块哈希的节点(也就是矿工)请求下载区块,下载完区块后,就跟接收到区块的红色节点一样,向它的相邻节点发送区块和区块哈希,如第一轮的过程。
这其中会有一种情况产生,如果提前接收到了未来的区块,比如说,区块A->区块B->区块C,需要的是区块B,但是接收到了区块C的情况,这时候会将区块哈希进行广播。
第n轮:
同第二轮,直到整个网络都知晓广播的区块。
(注意:下面的源码介绍的是以矿工挖出区块后的第一次广播,也就是进行第一轮操作。交易广播亦是如此。)
3.区块广播相关源码
首先是区块广播循环minedBroadcastLoop()
。
func (pm *ProtocolManager) minedBroadcastLoop() {
// 自动停止,如果退订了通道。
for obj := range pm.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
// 广播区块。
pm.BroadcastBlock(ev.Block, true) // 首先广播区块。
pm.BroadcastBlock(ev.Block, false) // 然后只广播区块哈希。
}
}
}
区块广播循环minedBroadcastLoop()
开启了之后,会一直读取区块事件的通道,也就是如果有新的区块事件产生,就能立即知晓。
然后进行区块广播。调用了两次pm.BroadcastBlock
方法。第一次标志位为true
,给部分节点广播区块。第二次标志位为false
,只广播区块哈希。
关于pm.BroadcastBlock
方法。
根据propagate
标志位的不同设置,对应不同的区块广播方式。
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool)
- 1.先获取新的区块的哈希
hash
,和本地节点的相邻节点中,未知这个区块的节点列表peers
。hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash)
- 2.如果
propagate
字段为true
。广播区块,节点的数量为peers
的长度的平方根。transferLen := int(math.Sqrt(float64(len(peers)))) transfer := peers[:transferLen] for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) }
- 3.如果
propagate
字段为false
。广播区块哈希,节点的数量为peers
剩余的节点数量。(即没有收到区块的节点。)for _, peer := range peers { peer.AsyncSendNewBlockHash(block) }
异步发送区块或区块哈希
(代码在eth/peer.go
里)
发送区块。
在远程节点的广播队列里加入了区块事件,如果远程节点的广播队列 queuedProps
满了,则无法收到。然后标注该远程节点已知该区块。
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
select {
case p.queuedProps <- &propEvent{block: block, td: td}:
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
}
}
发送区块哈希。
与发送区块类似。在远程节点的区块哈希通知队列里加入了区块事件。然后标注该远程节点已知该区块。
func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
select {
case p.queuedAnns <- block:
p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
}
}
4.交易广播以及源码
由于交易的数量比较多,所以每次广播的是一批交易。交易的广播也相对比较简单,一批新的交易,直接传给相邻节点即可。
交易广播循环 txBroadcastLoop()
循环读取交易事件通道,如果接收到新的一批交易,则广播出去。
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-pm.txsCh:
pm.BroadcastTxs(event.Txs)
case <-pm.txsSub.Err():
return
}
}
}
pm.BroadcastTxs 方法
广播交易,由于是一批交易,所以要先知道相邻的节点缺少这一批交易里的哪一些交易。定义了交易集合的映射,即远程节点对应该远程节点缺少的交易列表。然后发送交易。
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
var txset = make(map[*peer]types.Transactions)
// 广播给无该交易的节点
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())
for _, peer := range peers {
txset[peer] = append(txset[peer], tx)
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}
// 发送交易
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
}
}
异步发送交易
在远程节点的交易队列里加入了交易事件,如果远程节点的交易队列 queuedTxs
满了,则无法收到。然后标注该远程节点已知该交易。(一批交易。)
func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
select {
case p.queuedTxs <- txs:
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
default:
p.Log().Debug("Dropping transaction propagation", "count", len(txs))
}
}
5.异步发送区块和交易的说明
在进行区块广播和交易广播的时候,都是采用异步发送的形式,每个远程节点都设置了三个广播的通道,queuedProps
,区块通道,缓存为4个区块;queuedAnns
,区块哈希通道,缓存为4个区块哈希;queuedTxs
交易通道,缓存为128个交易。
需要进行区块或交易的广播的时候,将区块或交易放入远程节点相应的通道中。
远程节点读取通道内容
(代码在eth/peer.go
里)
在上层网络的peerSet
中加入新的远程节点,也就是Register
注册节点的时候,会开一个单独的协程,启动远程节点的广播方法,即go p.broadcast()
。
func (ps *peerSet) Register(p *peer) error {
...
ps.peers[p.id] = p
go p.broadcast()
return nil
}
go p.broadcast()
方法,是一个异步读取循环,每次从交易通道,或区块通道,或区块哈希通道中读取内容,然后执行对应的发送方法。
func (p *peer) broadcast() {
for {
select {
case txs := <-p.queuedTxs:
if err := p.SendTransactions(txs); err != nil {return}
...
case prop := <-p.queuedProps:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {return}
...
case block := <-p.queuedAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {return}
...
case <-p.term:
return
}
}
}
这里以发送区块为例,先标注该远程节点已知该区块,然后调用p2p.Send
方法,将区块发送给远程的节点。
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
}
然后是Send
方法,将区块数据进行rlp编码后置入r
中,size
为rlp编码后的数据长度。调用w.WriteMsg
方法将要发送的数据写入w
通道。
func Send(w MsgWriter, msgcode uint64, data interface{}) error {
size, r, err := rlp.EncodeToReader(data)
...
return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
}
6.总结
- 1.区块和交易的广播是一种gossip的传播方式,每个节点都向相邻的节点传播,最后蔓延开去,整个p2p网络就都知晓了广播的消息。
- 2.区块广播有两个内容,分别为区块和区块哈希的广播。
- 3.交易广播的对象是一批交易。