[以太坊源码分析][p2p网络05]:底层节点如何与上层节点联系


对于以太坊的p2p网络,我觉得,分为底层p2p网络的构建,以及上层eth服务的实现。在介绍以太坊上层服务之前,需要先来看一下底层网络是怎么跟上层网络联系在一起的。

0.索引

01.ProtocolManager 协议管理
02.新建一个 ProtocolManager
03.建立联系
04.使用 Run 方法
05.底层的peer结构
06.总结

1.ProtocolManager 协议管理

ProtocolManagereth/handle.go中的核心结构体,用来管理节点之间的通信。

type ProtocolManager struct {
    networkID uint64
    fastSync  uint32 
    acceptTxs uint32 

    txpool      txPool
    blockchain  *core.BlockChain
    chainconfig *params.ChainConfig
    maxPeers    int

    downloader *downloader.Downloader
    fetcher    *fetcher.Fetcher
    peers      *peerSet

    SubProtocols []p2p.Protocol

    eventMux      *event.TypeMux
    txsCh         chan core.NewTxsEvent
    txsSub        event.Subscription
    minedBlockSub *event.TypeMuxSubscription

    // fetcher, syncer, txsyncLoop 的通道
    newPeerCh   chan *peer
    txsyncCh    chan *txsync
    quitSync    chan struct{}
    noMorePeers chan struct{}

    wg sync.WaitGroup
}

ProtocolManager结构体中包含了

  • networkID网络id。
  • fastSync快速同步的标志, acceptTxs接收交易方式的标志。
  • txpool交易池,blockchain区块链,chainconfig区块链配置,maxpeers最大节点数。
  • downloader下载器,fetcher提取器,peers相邻节点表。
  • SubProtocols子协议列表。(与底层节点相关。)
  • 以及需要用到的各种通道和同步锁。

2.新建一个 ProtocolManager

底层节点与上层节点的联系,就在新建一个ProtocolManager的方法里。(第3个步骤,再往下会做具体说明。)

func NewProtocolManager(
    config *params.ChainConfig, 
    mode downloader.SyncMode, 
    networkID uint64, 
    mux *event.TypeMux, 
    txpool txPool,
    engine consensus.Engine, 
    blockchain *core.BlockChain, 
    chaindb ethdb.Database
) (*ProtocolManager, error) 
  • 1.初始化基础字段。
    manager := &ProtocolManager{networkID:   networkID,
      eventMux:    mux,
      txpool:      txpool,
      blockchain:  blockchain,
      chainconfig: config,
      peers:       newPeerSet(),
      newPeerCh:   make(chan *peer),
      noMorePeers: make(chan struct{}),
      txsyncCh:    make(chan *txsync),
      quitSync:    make(chan struct{}),}
    
  • 2.确认是否快速同步模式。
    if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {...}
    
  • 3.每个实现的版本添加子协议。也就是上层服务给予底层p2p网络调用上层服务的函数入口。(在这一步建立的联系。)
    manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
      for i, version := range ProtocolVersions {...}
    
  • 4.新建一个下载器downloader,用于下载区块。构建不同的同步机制,mode为同步机制类型。
    manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
    
  • 5.validator引用验证区块头的方法。
    validator := func(header *types.Header) error {
          return engine.VerifyHeader(blockchain, header, true)
      }
    
  • 6.heighter引用获取区块高度的方法。
    heighter := func() uint64 {
          return blockchain.CurrentBlock().NumberU64()
      }
    
  • 7.inserter引用在区块链上插入区块的方法。
    inserter := func(blocks types.Blocks) (int, error) {
          ...
          return manager.blockchain.InsertChain(blocks)
      }
    
  • 8.新建一个提取器fetcher,用于辅助同步区块。
    manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
    

3.(添加子协议)建立联系

添加子协议的具体步骤。

manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
    Name:    ProtocolName,
    Version: version,
    Length:  ProtocolLengths[i],
    Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
        peer := manager.newPeer(int(version), p, rw)
        select {
        case manager.newPeerCh <- peer:
            manager.wg.Add(1)
            defer manager.wg.Done()
            return manager.handle(peer)
        case <-manager.quitSync:
            return p2p.DiscQuitting
        }
    },
    NodeInfo: func() interface{} {
        return manager.NodeInfo()
    },
    PeerInfo: func(id enode.ID) interface{} {
        if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
            return p.Info()
        }
        return nil
    },
})

在新建一个p2p.Protocol对象的时候,

  • 传入三个字段:Name协议名,Version协议版本,Length协议长度。
  • 新建三个远程节点的回调函数:
    • Run执行协议。
    • NodeInfo返回了本地节点的网络id,区块难度值,创世区块哈希值,区块链配置,当前区块哈希值。
    • PeerInfo远程节点的信息。

Run方法manager.newPeer方法使得底层的peer能创建一个上层的peer,并且自身包含在上层的peer里。创建了上层的peer后,调用了manager.handle(peer)方法开始处理远程节点发来的消息。

4.调用子协议的 Run 方法 (包含步骤5)

在发起TCP连接请求的那一篇里,提到了:
在节点协议握手成功之后,srv.addpeer的通道中加入与远程节点的连接。
这时候会触发 case c := <-srv.addpeer:
(代码在p2p/server.go中)

case c := <-srv.addpeer:
    // 对协议握手进行一次检查
    err := srv.protoHandshakeChecks(peers, inboundCount, c)
    if err == nil {
        // 协议握手完成,通过检查。
        // 新建底层peer。
        p := newPeer(c, srv.Protocols)
        // 如果启用了消息事件,请将peerFeed传递给peer
        if srv.EnableMsgEvents {
            p.events = &srv.peerFeed
        }
        name := truncateName(c.name)
        srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
            // 启动一个单独的协程,运行节点。
            go srv.runPeer(p)
            // 接收请求的节点集合加入该节点。
            peers[c.node.ID()] = p
            if p.Inbound() {
                // 接入连接的数量加1.
                inboundCount++
            }
      }
  • 1.先对协议握手进行检查。
  • 2.协议握手检查通过后,新建一个底层的peer
  • 3.启动一个单独的协程,来执行这一个peergo srv.runPeer(p)。先进行节点添加这一事件的广播,然后调用p2p/peer.gopeer对象的run方法。
    func (srv *Server) runPeer(p *Peer) {
      // 测试。
      ...
      // 广播节点添加事件。
      srv.peerFeed.Send(&PeerEvent{
          Type: PeerEventTypeAdd,
          Peer: p.ID(),
      })
      // 执行协议·
      remoteRequested, err := p.run()
      // 广播节点下线
      srv.peerFeed.Send(&PeerEvent{
          Type:  PeerEventTypeDrop,
          Peer:  p.ID(),
          Error: err.Error(),
      })
      // 删除节点
      srv.delpeer <- peerDrop{p, err, remoteRequested}
    }
    
  • 4.在peers里加入该peer,如果接入成功,接入数量加1。

5.底层的peer结构以及run方法

(在p2p/peer.go中)
首先是底层peer结构体。它代表一个远程节点的连接。包含了rw建立的TCP连接,running协议对应的读写通道。

type Peer struct {
    rw      *conn                 // 建立的TCP连接
    running map[string]*protoRW   // 协议对应的读写通道
    log     log.Logger            // 日志记录
    created mclock.AbsTime

    wg       sync.WaitGroup
    protoErr chan error
    closed   chan struct{}
    disc     chan DiscReason

    // 接收消息发送/接收事件
    events *event.Feed
}

然后是底层peerrun方法,也就是一个底层节点会执行的所有操作。

func (p *Peer) run() (remoteRequested bool, err error) {
    // 定义变量。
    ...
    // 启动了两个单独的协程,一个用于循环的读取消息,一个用于循环发送ping消息,确保对方节点在线。
    p.wg.Add(2)
    go p.readLoop(readErr)
    go p.pingLoop()

    // 开启所有协议。
    writeStart <- struct{}{}
    p.startProtocols(writeStart, writeErr)

    // 等待接收到错误或者是断开连接。
loop:
    ...
    // 关闭节点。
    close(p.closed)
    p.rw.close(reason)
    p.wg.Wait()
    return remoteRequested, err
}

    1. 启动了两个单独的协程,go p.readLoop(readErr)用于循环的读取消息,go p.pingLoop()用于循环发送ping消息,确保对方节点在线。
  • 2.执行节点包含的所有协议,即p.startProtocols(writeStart, writeErr)方法。在p.startProtocols方法中调用了底层网络设置的回调函数Run(以下程序语句)。
    err := proto.Run(p, rw)
    

步骤4和步骤5一步一步的执行和调用,最后使用了p2p.ProtocolRun方法。

6.总结

  • 1.底层peer与上层peer通过p2p.Protocol对象的Run方法联系在一起。Run实现了新建上层节点,以及与协议版本对应的处理节点之间通信的消息的功能。
  • 2.关于底层p2p网络启动了的协程的总结:
    • Server服务启动了两个协程:监听远程节点发来的TCP请求,发起TCP连接请求。
    • 底层peer运行协议的时候,启动了n+2个协程:循环读取消息,循环发送ping消息,以及n个协议对应的处理方式的协程。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容