[以太坊源码分析][p2p网络03]:发起TCP连接请求


上一节介绍的是底层p2p网络开启后,监听别的远程节点发送来的TCP连接请求。这一节是个续集,要介绍的是本地节点如何向远程节点发起TCP连接请求

这一次,是我们打电话call别人。但是这一次有点不同,我们是以做任务的形式向别人发起拨号,而且每次做很多个任务。每一个任务里都包含了连接类型远程节点信息

究竟是怎么回事,请往下看。

0.索引

01.从Server服务开始
02.初始化拨号状态,以及创建任务
03.计划任务和开启任务
04.Do 执行任务
05.总结

1.从Server服务开始

Server服务启动,也开始了拨号

在上图中,主要是看第3个步骤,初始化拨号状态,和第6个步骤,开始拨号。(这里提一下,监听连接发起连接是两个单独的协程,所以并不是监听后再发起连接。)

2.初始化拨号状态,以及创建任务

dialstate 拨号状态
dialstatep2p/dial.go中的核心结构体,管理拨号(发起TCP连接请求)和查找节点的操作。
通过newDialState来新建它。关于dialstate字段的含义在下方的注释中。

func newDialState(static []*enode.Node, bootnodes []*enode.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate {
    s := &dialstate{
        maxDynDials: maxdyn,                                    // 最大的拨号任务数量
        ntab:        ntab,                                      // k桶                                        
        netrestrict: netrestrict,                               // ip网络的列表
        static:      make(map[enode.ID]*dialTask),              // 静态节点
        dialing:     make(map[enode.ID]connFlag),               // 拨号中,connFlag有4种拨号类型
        bootnodes:   make([]*enode.Node, len(bootnodes)),       // 初始引导节点
        randomNodes: make([]*enode.Node, maxdyn/2),             // 在k桶种随机查找节点,数量为最大拨号任务数量的二分之一
        hist:        new(dialHistory),                          // 记录最近的拨号
    }
    // 加入初始引导节点
    copy(s.bootnodes, bootnodes)
    // 加入静态节点
    for _, n := range static {
        s.addStatic(n)
    }
    return s
}

其中加入了两种节点,bootnodesstaticbootnodes是初始引导节点,在节点没有接收到任何节点的连接请求,也没有节点可以给我们邻居节点的时候,就去连接bootnodes,它硬编码在了以太坊的源码中。static是静态节点,如果我们想和某些节点保持长期的连接,就把它们加入到静态节点的列表中。

newTasks 新建任务
新建任务就是将某一些远程节点打包成任务,(一个任务对应一个远程节点),最终返回一个任务列表。执行任务就是给任务中的远程节点发起TCP连接请求。

以下是新建任务的流程图:

新建任务
  • 1.设置最大的任务数量,这个是是由节点最大连接数除以拨号比率得出的,即maxPeers/radio得到。
    needDynDials := s.maxDynDials
    
  • 2.判断peers里是否有已经建立连接的节点,peers是向本地节点发来连接请求的远程节点的集合。记录数量,最大任务数量减去这个数。
    for _, p := range peers {
        if p.rw.is(dynDialedConn) {
            needDynDials--
        }
    }
    
  • 3.判断服务中是否有正在拨号的节点。记录数量,最大任务数量减去这个数。
    for _, flag := range s.dialing {
        if flag&dynDialedConn != 0 {
            needDynDials--
        }
    }
    
  • 4.向设置的静态节点s.static发起连接请求,这个不消耗最大任务数量。
  • 5.如果发来连接请求的远程节点集合peers为空,并且经过了设置的时间fallbackInterval20s,会随机的连接一个引导节点bootnode。最大任务数量减1。
    if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
        // 将第一个bootnode放在列表最后,使每一次取的bootnode都是不一样的。
        bootnode := s.bootnodes[0]
        s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
        s.bootnodes = append(s.bootnodes, bootnode)
    
        if addDial(dynDialedConn, bootnode) {
            needDynDials--
        }
    }
    
  • 6.ReadRandomNodes随机的从UDP节点发现中使用Kad算法维护的k桶里,提取randomCandidates个节点。randomCandidates为最大任务数量的二分之一。(可能会提取不到这么多个,实际提取到的数量为n。)
    最大任务数量减去n
    randomCandidates := needDynDials / 2
    if randomCandidates > 0 {
        n := s.ntab.ReadRandomNodes(s.randomNodes)
        for i := 0; i < randomCandidates && i < n; i++ {
            if addDial(dynDialedConn, s.randomNodes[i]) {
                needDynDials--
            }
        }
    }
    
  • 7.如果还不满足最大任务数量的话,从s.lookupBuf里提取,直到达到最大任务数量。s.lookupBuf里的节点也是通过Kad获取节点的方式获取到的,如果s.lookupBuf里节点数量不够,则创建发现任务discoverTask 进行节点发现,填充s.lookupBuf
    // 从lookupBuf里提取节点。
    i := 0
    for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
        if addDial(dynDialedConn, s.lookupBuf[i]) {
            needDynDials--
        }
    }
    // 去掉被提取出来的节点。
    s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]    
    // 数量不够的话,进行节点发现。
    if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
        s.lookupRunning = true
        newtasks = append(newtasks, &discoverTask{})
    }
    
  • 8.如果没有需要执行的任务,会执行等待任务waitExpireTask,也就是,保持拨号逻辑继续运行。
    if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
          t := &waitExpireTask{s.hist.min().exp.Sub(now)}
          newtasks = append(newtasks, t)
      }
    

上述过程,即完成了一次任务的创建,结果是得到一个任务列表newtasks

3.计划任务和开启任务

server.gorun(dialstate dialer)
先来看一下Server服务中关于任务的计划和执行的流程图:

发起TCP连接请求

  • 1.首先是对字段进行初始化,例如,发来请求连接的远程节点列表peers,接入的连接数inboundCount。最主要的是定义了两种任务列表,runningTasks运行中的任务列表,和queuedTasks排队等待中的任务列表。
    • runningTasks是指执行一个任务(即发起一个TCP连接请求),就将该任务加入到runningTasks列表中。完成任务后移除。
    • queuedTasks是指新建了任务后,将任务加入到queuedTasks列表中,queuedTasks中的任务被执行时从queuedTasks中移除,加入到runningTasks中。
  • 2.定义了三种对任务进行的操作:scheduleTasks计划任务,startTasks开始任务,delTask删除任务。
    • delTaskrunningTasks移除给定的单个任务。
    delTask := func(t task) {
       // 循环查找到该任务,然后移除。
       for i := range runningTasks {
           if runningTasks[i] == t {
               runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
               break
           }
       }
    }
    
    • startTasks 如果运行中的任务数量没有达到maxActiveDialTasks最大活跃的任务数量(默认为16个),则开始执行任务。每一个任务都是一个单独的线程。任务的执行通过调用t.Do(srv)进行,任务完成后将任务传入taskdone通道。
      执行中的任务加入runningTasks列表中。最终返回ts列表中未执行的任务。
    startTasks := func(ts []task) (rest []task) {
         i := 0
         for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
             t := ts[i]
             srv.log.Trace("New dial task", "task", t)
             // 分配线程,开始执行任务。
             go func() { t.Do(srv); taskdone <- t }()
             // 运行任务列表中加入该任务。
             runningTasks = append(runningTasks, t)
         }
         return ts[i:]
    }
    
    • scheduleTasks 先执行queuedTasks列表中的任务,queuedTasks列表中未被执行的任务将被保留。
    • 然后如果运行中的任务数量没有达到最大活跃的任务数量,则调用dialstate.newTasks新建任务,接着执行刚刚新建任务。未被执行的任务也加入到queuedTasks列表中,等待循环的下一次执行。
    scheduleTasks := func() {
      // 执行queuedTasks列表中的任务。
      queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
      // 如果运行中的任务数量没有达到最大的拨号数量
      if len(runningTasks) < maxActiveDialTasks {
          // 新建拨号任务
          nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
          // 先执行新建的任务,新建的任务中未被执行的任务加入到queuedTasks列表中
          queuedTasks = append(queuedTasks, startTasks(nt)...)
      }
    }
    
  • 3.主要的处理循环:
    • 开启了计划任务函数,由于开始任务函数包含在计划函数里,所以这里开始了新建任务以及并发的执行任务。running是运行与否的标志位。
      running:
        for {
            scheduleTasks()
            // 处理消息
            ...
        }
      
    • 接下来是一个内容很多的select case结构,处理接收到的内容。
    • 比如说,case n := <-srv.addtrusted: 触发后,将某个节点标记为受信任的节点。case n := <-srv.removetrusted: 触发后,移除某个受信任的节点。
    • 比较重要的是case c := <-srv.addpeer:,需要新建远程节点的时候触发。也是说这个case会在上一篇中介绍的节点协议握手成功之后,srv.addpeer的通道中加入与远程节点的连接的时候触发。
      case c := <-srv.addpeer:
          // 协议握手检查
          err := srv.protoHandshakeChecks(peers, inboundCount, c)
          if err == nil {
              // 握手完成,通过所有检查。
              p := newPeer(c, srv.Protocols)
              ...
              // 执行远程节点。
              go srv.runPeer(p)
              // 加入连接请求的peers列表。
              peers[c.node.ID()] = p
              // 接入节点数加1。
              if p.Inbound() {
                  inboundCount++
              }
          }
          select {
          case c.cont <- err:
          case <-srv.quit:
              break running
          }
      
  • 4.最后是循环完毕,关闭节点发现,断开与全部节点的连接,并清空peers

4.Do 执行任务

dial.goDo(srv *Server)
上述startTasks开始任务中执行任务的具体过程。

func (t *dialTask) Do(srv *Server) {
    // 判断节点是否完整,不完整的节点表示没有ip地址。
    if t.dest.Incomplete() {
        // 解析,使用Kad的方法查找到该节点的ip地址。
        if !t.resolve(srv) {
            return
        }
    }
    // 拨号
    err := t.dial(srv, t.dest)
    ...
    }
}
  • 先判断节点的完整性,不完整的话解析获取该节点的ip地址。
  • 然后进行拨号。
func (t *dialTask) dial(srv *Server, dest *enode.Node) error {
    // fd是一个连接
    fd, err := srv.Dialer.Dial(dest)
    ...
    return srv.SetupConn(mfd, t.flags, dest)
}
  • dial,拨号,调用了golang自带的net.Dialer.Dial方法建立TCP连接,然后使用srv.SetupConn方法进行加密握手和协议握手。
  • 在上一节监听连接中,srv.SetupConn的第三个传入字段是nil,因为我们是监听连接的,所以还无该节点公钥。这一次是发起请求,所以我们知道该远程节点的公钥,在加密握手之后,可以将我们拥有的公钥与远程节点发来的公钥进行验证。

5.总结

  • 1.节点发起TCP连接请求是通过创建任务,执行任务实现的,以任务的形式,可以更好的控制建立连接的数量,也方便并发的发起连接请求。
  • 2.监听TCP连接和发起TCP请求相辅相成。监听连接负责接收远程节点的TCP连接,以及建立与远程节点的加密通道;发起请求负责向远程节点发送TCP连接请求,以及执行建立了加密通道后的远程节点(的协议)。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350