[以太坊源码分析][p2p网络06]:交易广播和区块广播


eth/handle.go中的ProtocolManager管理节点之间通信。节点与节点之间的通信,也就是区块和交易的广播或同步。

这里先介绍广播。提及广播,要先说一个有趣的协议:gossip,对,就是流言蜚语。如果有关于明星的八卦或是负面新闻,不用多长时间,可能满大街的人们就都知道了。广播就类似于流言蜚语的传播,一传十,十传百的扩散出去,最后整个网络都知晓了。

以下是ProtocolManager实现区块和交易的广播的流程图:

ProtocolManager的实现方法

接下来会一步一步介绍。

0.索引

01.广播和同步的启动
02.区块广播
03.区块广播相关源码
04.交易广播以及源码
05.异步发送区块和交易的说明
06.总结

1.广播和同步的启动

区块和交易的广播与同步由ProtocolManager协议管理控制。启动方法为Start

func (pm *ProtocolManager) Start(maxPeers int) {
    pm.maxPeers = maxPeers

    // 广播交易
    pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
    pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
    go pm.txBroadcastLoop()

    // 广播区块
    pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
    go pm.minedBroadcastLoop()

    // 开始同步
    go pm.syncer()
    go pm.txsyncLoop()
}

开启了4个协程:

  • 1.创建了新交易事件的通道,然后开始pm.txBroadcastLoop() 广播交易。
  • 2.创建了新区块事件的通道,然后开始pm.minedBroadcastLoop() 广播区块。
  • 3.同步区块,同步的过程在eth/sync.go里,下一次介绍。
  • 4.同步交易。

先从区块广播开始。

2.区块广播

区块广播指的是矿工挖出新的区块后,将新区块告知并发给p2p网络中的所有节点。这里涉及到两个广播过程:

  • 1.矿工广播新区块
  • 2.其他的中继节点广播新区快

如下图:


区块广播

第一轮:
黄色的节点表示矿工,矿工挖到区块后,接下来要将区块广播出去,也就是发送给相邻的节点,这里相邻的节点有5个,两个红色的节点和三个蓝色的节点。红色的节点表示收到区块的节点,蓝色的节点表示收到区块哈希的节点。

这里红色的节点是有一定数量要求的。取的是,要广播的节点数量的平方根。要广播5个节点,5取平方根再取整为2个。也就是说矿工向这两个红色节点直接发送了区块,然后向剩余的节点发送了区块哈希

第二轮:
接收到区块哈希的蓝色节点向发来区块哈希的节点(也就是矿工)请求下载区块,下载完区块后,就跟接收到区块的红色节点一样,向它的相邻节点发送区块区块哈希,如第一轮的过程。

这其中会有一种情况产生,如果提前接收到了未来的区块,比如说,区块A->区块B->区块C,需要的是区块B,但是接收到了区块C的情况,这时候会将区块哈希进行广播。

第n轮:
同第二轮,直到整个网络都知晓广播的区块。

注意:下面的源码介绍的是以矿工挖出区块后的第一次广播,也就是进行第一轮操作。交易广播亦是如此。)

3.区块广播相关源码

首先是区块广播循环minedBroadcastLoop()
func (pm *ProtocolManager) minedBroadcastLoop() {
    // 自动停止,如果退订了通道。
    for obj := range pm.minedBlockSub.Chan() {
        if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
            // 广播区块。
            pm.BroadcastBlock(ev.Block, true)  // 首先广播区块。
            pm.BroadcastBlock(ev.Block, false) // 然后只广播区块哈希。
        }
    }
}

区块广播循环minedBroadcastLoop()开启了之后,会一直读取区块事件的通道,也就是如果有新的区块事件产生,就能立即知晓。

然后进行区块广播。调用了两次pm.BroadcastBlock方法。第一次标志位为true,给部分节点广播区块。第二次标志位为false,只广播区块哈希。

关于pm.BroadcastBlock方法。

根据propagate标志位的不同设置,对应不同的区块广播方式。

func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool)
  • 1.先获取新的区块的哈希hash,和本地节点的相邻节点中,未知这个区块的节点列表peers
    hash := block.Hash()
    peers := pm.peers.PeersWithoutBlock(hash)
    
  • 2.如果 propagate 字段为true。广播区块,节点的数量为peers的长度的平方根。
    transferLen := int(math.Sqrt(float64(len(peers))))
    transfer := peers[:transferLen]
    for _, peer := range transfer {
      peer.AsyncSendNewBlock(block, td)
    }
    
  • 3.如果 propagate 字段为false。广播区块哈希,节点的数量为peers剩余的节点数量。(即没有收到区块的节点。)
    for _, peer := range peers {
      peer.AsyncSendNewBlockHash(block)
    }
    
异步发送区块或区块哈希

(代码在eth/peer.go里)
发送区块。
在远程节点的广播队列里加入了区块事件,如果远程节点的广播队列 queuedProps 满了,则无法收到。然后标注该远程节点已知该区块。

func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
    select {
    case p.queuedProps <- &propEvent{block: block, td: td}:
        p.knownBlocks.Add(block.Hash())
    default:
        p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
    }
}

发送区块哈希。
与发送区块类似。在远程节点的区块哈希通知队列里加入了区块事件。然后标注该远程节点已知该区块。

func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
    select {
    case p.queuedAnns <- block:
        p.knownBlocks.Add(block.Hash())
    default:
        p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
    }
}

4.交易广播以及源码

由于交易的数量比较多,所以每次广播的是一批交易。交易的广播也相对比较简单,一批新的交易,直接传给相邻节点即可。

交易广播循环 txBroadcastLoop()

循环读取交易事件通道,如果接收到新的一批交易,则广播出去。

func (pm *ProtocolManager) txBroadcastLoop() {
    for {
        select {
        case event := <-pm.txsCh:
            pm.BroadcastTxs(event.Txs)
        case <-pm.txsSub.Err():
            return
        }
    }
}
pm.BroadcastTxs 方法

广播交易,由于是一批交易,所以要先知道相邻的节点缺少这一批交易里的哪一些交易。定义了交易集合的映射,即远程节点对应该远程节点缺少的交易列表。然后发送交易。

func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
    var txset = make(map[*peer]types.Transactions)

    // 广播给无该交易的节点
    for _, tx := range txs {
        peers := pm.peers.PeersWithoutTx(tx.Hash())
        for _, peer := range peers {
            txset[peer] = append(txset[peer], tx)
        }
        log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
    }
    // 发送交易
    for peer, txs := range txset {
        peer.AsyncSendTransactions(txs)
    }
}
异步发送交易

在远程节点的交易队列里加入了交易事件,如果远程节点的交易队列 queuedTxs 满了,则无法收到。然后标注该远程节点已知该交易。(一批交易。)

func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
    select {
    case p.queuedTxs <- txs:
        for _, tx := range txs {
            p.knownTxs.Add(tx.Hash())
        }
    default:
        p.Log().Debug("Dropping transaction propagation", "count", len(txs))
    }
}

5.异步发送区块和交易的说明

在进行区块广播和交易广播的时候,都是采用异步发送的形式,每个远程节点都设置了三个广播的通道,queuedProps,区块通道,缓存为4个区块;queuedAnns,区块哈希通道,缓存为4个区块哈希;queuedTxs交易通道,缓存为128个交易。

需要进行区块或交易的广播的时候,将区块或交易放入远程节点相应的通道中。

远程节点读取通道内容

(代码在eth/peer.go里)

在上层网络的peerSet中加入新的远程节点,也就是Register注册节点的时候,会开一个单独的协程,启动远程节点的广播方法,即go p.broadcast()

func (ps *peerSet) Register(p *peer) error {
    ...
    ps.peers[p.id] = p
    go p.broadcast()
    return nil
}

go p.broadcast()方法,是一个异步读取循环,每次从交易通道,或区块通道,或区块哈希通道中读取内容,然后执行对应的发送方法。

func (p *peer) broadcast() {
    for {
        select {
        case txs := <-p.queuedTxs:
            if err := p.SendTransactions(txs); err != nil {return}
        ...
        case prop := <-p.queuedProps:
            if err := p.SendNewBlock(prop.block, prop.td); err != nil {return}
        ...

        case block := <-p.queuedAnns:
            if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {return}
        ...
        case <-p.term:
            return
        }
    }
}

这里以发送区块为例,先标注该远程节点已知该区块,然后调用p2p.Send方法,将区块发送给远程的节点。

func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
    p.knownBlocks.Add(block.Hash())
    return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
}

然后是Send方法,将区块数据进行rlp编码后置入r中,size为rlp编码后的数据长度。调用w.WriteMsg方法将要发送的数据写入w通道。

func Send(w MsgWriter, msgcode uint64, data interface{}) error {
    size, r, err := rlp.EncodeToReader(data)
    ...
    return w.WriteMsg(Msg{Code: msgcode, Size: uint32(size), Payload: r})
}

6.总结

  • 1.区块和交易的广播是一种gossip的传播方式,每个节点都向相邻的节点传播,最后蔓延开去,整个p2p网络就都知晓了广播的消息。
  • 2.区块广播有两个内容,分别为区块和区块哈希的广播。
  • 3.交易广播的对象是一批交易。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容