NSQ源码分析(2)- nsqd消息的推送与订阅

NSQ针对消费者采取消息推送的方式,因为NSQ本身基于内存和diskq,并不能容忍太大的消息的堆积,使用推模式也合情合理。

前一篇我们已经看到了针对一个发送到给定topic后,这个message被复制了多份,发送到了这个topic下的每一个channel中,存在在channel的memeoryMsgChan或者backend中。

消息的订阅与推送

关于消息的推送最重要的是两个文件:nsqd/protocol_v2.go和nsqd/client_v2.go。

当一个客户端与nsqd进程建立了一个tcp链接时,代码会调用protocolV2.IOLoop方法,并新建一个clientV2结构体对象。IOLoop方法会启动一个协程执行messagePump方法。

对于每一个tcp连接,都会有两个协程:运行IOLoop的协程用于接收客户端的请求;运行messagePump的负责处理数据,把数据给客户端clientV2推送给客户端。

整个protocol_v2就是一个比较经典的tcp协议的实现。每当建立一个新的tcp连接,服务器都会建立一个client_v2对象,和启动protocol_v2.messagePump协程,一个client只会订阅一个channel。IOLoop用于接收客户端传来的指令,并进行回复,并通过各个channel和其它的组件通信(包括protocol_v2.messagePump)。详情可以看源代码:github.com/nsqio/nsq/nsqd/protocol_v2.go

我们想要关注的消息的推送可以看messagePump的实现,如下:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    var err error
    var buf bytes.Buffer
    var memoryMsgChan chan *Message
    var backendMsgChan chan []byte
    var subChannel *Channel
    // NOTE: `flusherChan` is used to bound message latency for
    // the pathological case of a channel on a low volume topic
    // with >1 clients having >1 RDY counts
    var flusherChan <-chan time.Time
    var sampleRate int32

    subEventChan := client.SubEventChan
    identifyEventChan := client.IdentifyEventChan
    outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
    heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
    heartbeatChan := heartbeatTicker.C
    msgTimeout := client.MsgTimeout

    // v2 opportunistically buffers data to clients to reduce write system calls
    // we force flush in two cases:
    //    1. when the client is not ready to receive messages
    //    2. we're buffered and the channel has nothing left to send us
    //       (ie. we would block in this loop anyway)
    //
    flushed := true

    // signal to the goroutine that started the messagePump
    // that we've started up
    close(startedChan)

    for {
        //IsReadyForMessages会检查InFlightMessages的数目是否超过了客户端设置的RDY,超过后,不再取消息推送,而是强制做flush。
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        } else if flushed {
            // last iteration we flushed...
            // do not select on the flusher ticker channel
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan: //ticker chan,保证定期flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        case <-client.ReadyStateChan://continue to next iteration:check ready state
        case subChannel = <-subEventChan://收到client的SUB的topic的channel后,更新内存中的subChannel开始推送;只会SUB一个channel
            // you can't SUB anymore
            subEventChan = nil
        case identifyData := <-identifyEventChan:
            //SKIP
        case <-heartbeatChan://heartbeat check
            err = p.Send(client, frameTypeResponse, heartbeatBytes)
            if err != nil {
                goto exit
            }
        case b := <-backendMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }

            msg, err := decodeMessage(b)
            if err != nil {
                p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage() //add mem count
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case msg := <-memoryMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }

exit:
    p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
    heartbeatTicker.Stop()
    outputBufferTicker.Stop()
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
    }
}

首先,客户端发送一个SUB消息来订阅一个topic下的Channel。protocol_v2.go/protocolV2.SUB中,会往clientV2.go/client.SubEventChan发送一个channel。这里的messagePump便更新了内存中的subChannel开始推送这个订阅的channel的消息。

在循环的开头,messageMsgChan和backendChan都会用这个subChannel对应的channel select它们的消息。每当有一个消息来的时候,首先(1)会调用channel的StartInFlightTimeout,channel会把这个消息加到InFlightPqueue里,这个是以timeout时间作为优先级的优先级队列(最小堆),用于保存发送给客户端但是还没有被确认的消息。
(2)还有会更新client的一些counter信息,如InFlightMessageCount等,根据InFlightMessageCount和RDY比较决定是否继续推送消息。

客户端成功消费一条消息后,会发送一个FIN消息,带上message ID,client会-1 InFlightMessageCount,从channel的InflightMessage中取出这个消息,并向ReadStateChan发送一个消息;如果服务端因为RDY限制停止推送消息,收到这个消息后,也会重新查看是否可以继续推送消息。
或者客户端如果消费失败,也会发送一个REQ的请求,channel会把这个消息从channel的InflightMessage中取出这个消息,重新放入channel。
那如果客户端没有对消息做回复呢?

消息超时的设计与实现

在nsqd.go中,还有一部分重要的实现,queueScanLoop方法中,每隔QueueScanInterval的时间,会从方法cache的channels list中随机选择QueueScanSelectionCount个channel,然后去执行resizePool。这个实现参考了redis的probabilistic expiration algorithm.

参考《Redis设计与实现》9.6 Redis的过期键删除策略,结合了两种策略:

  1. 惰性删除。每次客户端对某个key读写时,会检查它是否过期,如果过期,就把它删掉。
  2. 定期删除。定期删除并不会遍历整个DB,它会在规定时间内,分多次遍历服务器中各个DB,从数据库的expires字典中随机检查一部分键的过期时间,如果过期,则删除。

对于nsqd的channel,它有两个队列需要定时检查,一个是InFlightQueue,一个是DeferredQueue。任何一个有工作做,这个channel就被视为dirty的。
每隔default 100ms(QueueScanInterval),nsqd会随机选择20(QueueScanSelectionCount)个channel扔到workerCh chan之中。
每隔5s,queueScanLoop都会调用resizePool。resizePool可以看做是一个fixed pool size的协程池,idealPoolSize= min(AllChannelNum * 0.25, QueueScanWorkerPoolMax)。这么多的协程的工作就是,对于从workerCh收到的每一个channel,都会调用它的channel.go/channel.processInFlightQueue方法和channel.go/channel.processDeferredQueue方法,任何的变动都会把这次queueScan行为标记为dirty。
每次这20个channel全部都scan完毕后,会统计dirtyNum / QueueScanSelectionNum的比例,如果大于某个预设的阈值QueueScanDirtyPercent,将不会间隔时间,直接开始下一轮的QueueScan。
那么为什么每隔5s要重新调用resizePool呢?这是为了根据最新的allChannelNum给予机会去更新resizePool协程池的协程数。因为PoolSize是NSQD的数据域,是全局的状态,每次调用并不会另外新建一个协程池,而是根据idealSize调整它的大小。这部分代码实现也比较经典,可以学习一下“如何使用Golang实现一个协程池的经典实现,尤其是需要动态调整池大小的需求”。

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
//  1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
//
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
//
// If either of the queues had work to do the channel is considered "dirty".
//
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.
func (n *NSQD) queueScanLoop() {
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }

        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

channel.go/channel.processInFlightQueue的实现比较简单,把channel的InflightPQueue中的message按照超时时间由早到晚把超时时间小于给定时间的消息依次取出,做一些一致性的数据操作后,重新放入channel之中(也会发送TryUpdateReadyState)。processDeferredQueue也是类似的。
这里通过了一定的概率加受控制的并发协程池,清理内存中timeout未被客户端所确认的消息,重新放入队列,保证了消息的可达性。(存在重复消费消息的可能)

经典的GO并发

我们其实可以发现,同一个channel,可能会有很多的client从它的memoryMsgChan和backendChan里select监听消息,因为同一个消息对于Golang的channel来说只会被一个监听者收到,所以,通过这样的机制实现了一定程度上的消费者的负载均衡。
NSQ的代码很适合用Golang的Goroutine, Channel, & Mutex并发的good practice来学习。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345

推荐阅读更多精彩内容