同步,也就是区块链的数据的同步。这里分为两种同步方式,一是本地区块链与远程节点的区块链进行同步,二是将交易均匀的同步给相邻的节点。
0.索引
01.同步区块链
02.同步交易
03.总结
1.同步区块链
ProtocolManager
协议管理中的go pm.syncer()
协程。
func (pm *ProtocolManager) syncer() {
// 启动fetcher,辅助同步区块。
pm.fetcher.Start()
defer pm.fetcher.Stop()
defer pm.downloader.Terminate()
// 等待不同的事件来触发同步操作。
forceSync := time.NewTicker(forceSyncCycle) // 强制同步触发的计时器。
defer forceSync.Stop()
for {
select {
// 有新建的远程节点。
case <-pm.newPeerCh:
// 确保有节点可以选择,然后同步。数量最低为5个。
if pm.peers.Len() < minDesiredPeerCount {
break
}
go pm.synchronise(pm.peers.BestPeer())
case <-forceSync.C:
// 没有足够数量的节点,也进行强制同步。
go pm.synchronise(pm.peers.BestPeer())
case <-pm.noMorePeers:
return
}
}
}
先启动了fetcher
,辅助同步区块用的。然后等待不同的事件触发不同的同步方式。
- 以太坊的节点在新增的相邻节点数比
minDesiredPeerCount = 5
大的时候,会与相邻的节点进行区块链的同步。 - 以太坊设置了一个强制同步的机制,每经过
forceSyncCycle
10秒,则会与相邻节点进行同步。
同步的过程调用pm.synchronise
方法来进行。
func (pm *ProtocolManager) synchronise(peer *peer) {
...
// 确保远程节点比本地区块链要新。通过难度值td来做比较。
currentBlock := pm.blockchain.CurrentBlock()
td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
pHead, pTd := peer.Head()
...
// 设置同步的方式。
...
// 运行同步循环,与下载器同步。如果本地已有枢纽区块数据则取消快速同步的方式。
if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
return
}
...
// 完成了同步之后,将本地区块链的高度最高的区块广播出去,给其他节点广播的是区块哈希。
if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
go pm.BroadcastBlock(head, false)
}
}
1.先确定远程节点的区块链是否比本地节点的区块链新,新指的是区块链头的难度值更大。
-
2.然后确定同步的模式,并且下载区块。
- 如果是刚刚启动的本地节点,即本地的区块链数据库里没有数据,可以使用快速同步模式。
- 如果同步过程中已经通过
the pivot block
枢纽块,则禁用快速同步。
(同步中具体获得区块的过程是通过
pm.downloader
下载器实现的。) 3.同步完成之后,向其他节点广播本地区块链目前的状态,即将区块链头的区块哈希广播出去。
pm.BroadcastBlock(head, false)
,标志位为false
,将区块的哈希广播出去。
2.同步交易
ProtocolManager
协议管理中的go pm.txsyncLoop()
协程。
同步交易循环 txsyncLoop
分为三个部分的内容:
1.定义初始化的字段。
var (
pending = make(map[enode.ID]*txsync)
sending = false
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
-
pending
,未处理的交易。一个pending
包含很多个txsync
,txsync
由远程节点和交易列表组成。 -
sending
,交易发送的标志位。 -
pack
,交易包,正在发送中的交易。 -
done
,完成交易发送的标志位。
2.定义发送交易和挑选函数。
发送交易的函数。
send := func(s *txsync) {
// 根据设置的长度,填充pack交易包。
size := common.StorageSize(0)
pack.p = s.p
pack.txs = pack.txs[:0]
for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
pack.txs = append(pack.txs, s.txs[i])
size += s.txs[i].Size()
}
// 移除将要发送的交易。
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
delete(pending, s.p.ID())
}
// 后台发送交易包。
s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
sending = true
go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
- 1.根据
txsyncPackSize
设置的交易编码长度,打包一定数量的交易到pack
里。 - 2.打包的交易即将要发送的交易,在
s
中移除这些交易。 - 3.设置
sending
标志为true
,开启一个单独的协程发送打包的交易,发送完成后,done
通道会有数据。
挑选函数。
pick := func() *txsync {
if len(pending) == 0 {
return nil
}
n := rand.Intn(len(pending)) + 1
for _, s := range pending {
if n--; n == 0 {
return s
}
}
return nil
}
- 取出
pending
里最后一个交易包。
3.执行发送交易的循环。
for {
select {
case s := <-pm.txsyncCh:
pending[s.p.ID()] = s
if !sending {
send(s)
}
case err := <-done:
sending = false
if err != nil {
pack.p.Log().Debug("Transaction send failed", "err", err)
delete(pending, pack.p.ID())
}
// 计划下次发送。
if s := pick(); s != nil {
send(s)
}
case <-pm.quitSync:
return
}
}
三个监听协程的case
。
- 1.
case s := <-pm.txsyncCh:
,交易同步通道里有交易后触发,在pending
里加入监听到的交易。 - 2.
case err := <-done:
,完成发送交易的时候触发,设置sending
标志位为false
。然后计划下一次的交易发送。 - 3.
case <-pm.quitSync:
,退出。
3.总结
- 1.区块同步有两方面的作用:新的本地节点启动的时候,没有区块链的数据,通过同步从相邻节点处获取到相邻节点的最佳区块链;每10秒一次进行的同步,可以确保本地区块链数据记录的时时更新,确保自身区块链保持最佳状态。
- 2.交易同步:未处理的交易,可能相邻节点没有,比如说刚启动的相邻节点,将本地的未处理交易均匀发送给相邻节点。