[以太坊源码分析][p2p网络07]:同步区块和交易

eth07.jpg

同步,也就是区块链的数据的同步。这里分为两种同步方式,一是本地区块链与远程节点的区块链进行同步,二是将交易均匀的同步给相邻的节点。

0.索引

01.同步区块链
02.同步交易
03.总结

1.同步区块链

ProtocolManager协议管理中的go pm.syncer()协程。

func (pm *ProtocolManager) syncer() {
    // 启动fetcher,辅助同步区块。
    pm.fetcher.Start()
    defer pm.fetcher.Stop()
    defer pm.downloader.Terminate()

    // 等待不同的事件来触发同步操作。
    forceSync := time.NewTicker(forceSyncCycle) // 强制同步触发的计时器。
    defer forceSync.Stop()

    for {
        select {
        // 有新建的远程节点。
        case <-pm.newPeerCh:
            // 确保有节点可以选择,然后同步。数量最低为5个。
            if pm.peers.Len() < minDesiredPeerCount {
                break
            }
            go pm.synchronise(pm.peers.BestPeer())

        case <-forceSync.C:
            // 没有足够数量的节点,也进行强制同步。
            go pm.synchronise(pm.peers.BestPeer())

        case <-pm.noMorePeers:
            return
        }
    }
}

先启动了fetcher,辅助同步区块用的。然后等待不同的事件触发不同的同步方式。

  • 以太坊的节点在新增的相邻节点数比minDesiredPeerCount = 5大的时候,会与相邻的节点进行区块链的同步。
  • 以太坊设置了一个强制同步的机制,每经过forceSyncCycle 10秒,则会与相邻节点进行同步。

同步的过程调用pm.synchronise方法来进行。

func (pm *ProtocolManager) synchronise(peer *peer) {
    ...
    // 确保远程节点比本地区块链要新。通过难度值td来做比较。
    currentBlock := pm.blockchain.CurrentBlock()
    td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())

    pHead, pTd := peer.Head()
    ...
    // 设置同步的方式。
    ...
    // 运行同步循环,与下载器同步。如果本地已有枢纽区块数据则取消快速同步的方式。
    if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
        return
    }
    ...
    // 完成了同步之后,将本地区块链的高度最高的区块广播出去,给其他节点广播的是区块哈希。
    if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
        go pm.BroadcastBlock(head, false)
    }
}
  • 1.先确定远程节点的区块链是否比本地节点的区块链新,新指的是区块链头的难度值更大。

  • 2.然后确定同步的模式,并且下载区块。

    • 如果是刚刚启动的本地节点,即本地的区块链数据库里没有数据,可以使用快速同步模式。
    • 如果同步过程中已经通过the pivot block枢纽块,则禁用快速同步。

    (同步中具体获得区块的过程是通过pm.downloader下载器实现的。)

  • 3.同步完成之后,向其他节点广播本地区块链目前的状态,即将区块链头的区块哈希广播出去。pm.BroadcastBlock(head, false),标志位为false,将区块的哈希广播出去。

2.同步交易

ProtocolManager协议管理中的go pm.txsyncLoop()协程。

同步交易循环 txsyncLoop 分为三个部分的内容:

1.定义初始化的字段。
var (
        pending = make(map[enode.ID]*txsync)
        sending = false              
        pack    = new(txsync)         // the pack that is being sent
        done    = make(chan error, 1) // result of the send
    )
  • pending,未处理的交易。一个pending包含很多个txsynctxsync由远程节点和交易列表组成。
  • sending,交易发送的标志位。
  • pack,交易包,正在发送中的交易。
  • done,完成交易发送的标志位。
2.定义发送交易和挑选函数。

发送交易的函数。

send := func(s *txsync) {
    // 根据设置的长度,填充pack交易包。
    size := common.StorageSize(0)
    pack.p = s.p
    pack.txs = pack.txs[:0]
    for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
        pack.txs = append(pack.txs, s.txs[i])
        size += s.txs[i].Size()
    }
    // 移除将要发送的交易。
    s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
    if len(s.txs) == 0 {
        delete(pending, s.p.ID())
    }
    // 后台发送交易包。
    s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
    sending = true
    go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
  • 1.根据 txsyncPackSize 设置的交易编码长度,打包一定数量的交易到pack里。
  • 2.打包的交易即将要发送的交易,在s中移除这些交易。
  • 3.设置sending标志为true,开启一个单独的协程发送打包的交易,发送完成后,done通道会有数据。

挑选函数。

pick := func() *txsync {
    if len(pending) == 0 {
        return nil
    }
    n := rand.Intn(len(pending)) + 1
    for _, s := range pending {
        if n--; n == 0 {
            return s
        }
    }
    return nil
}
  • 取出pending里最后一个交易包。
3.执行发送交易的循环。
for {
    select {
    case s := <-pm.txsyncCh:
        pending[s.p.ID()] = s
        if !sending {
            send(s)
        }
    case err := <-done:
        sending = false
        if err != nil {
            pack.p.Log().Debug("Transaction send failed", "err", err)
            delete(pending, pack.p.ID())
        }
        // 计划下次发送。
        if s := pick(); s != nil {
            send(s)
        }
    case <-pm.quitSync:
        return
    }
}

三个监听协程的case

  • 1.case s := <-pm.txsyncCh:,交易同步通道里有交易后触发,在pending里加入监听到的交易。
  • 2.case err := <-done:,完成发送交易的时候触发,设置sending标志位为false。然后计划下一次的交易发送。
  • 3.case <-pm.quitSync:,退出。

3.总结

  • 1.区块同步有两方面的作用:新的本地节点启动的时候,没有区块链的数据,通过同步从相邻节点处获取到相邻节点的最佳区块链;每10秒一次进行的同步,可以确保本地区块链数据记录的时时更新,确保自身区块链保持最佳状态。
  • 2.交易同步:未处理的交易,可能相邻节点没有,比如说刚启动的相邻节点,将本地的未处理交易均匀发送给相邻节点。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容