NSQ 源码学习笔记(一)

首先我们来看一下Nsq的组织结构:

  • nsqd:接收,分发队列信息的守护进程,可以单独部署,也可以集群化运行
  • nsqlookupd:管理nsqd节点,服务发现
  • nsqadmin:nsq的可视化管理工具

NSQ的拓补图

@拓扑图 | center

NSQ中Topic和channel的关系

Topic会将消息发送到每个订阅者(channel)
channel的读消费类似负载均衡,会均匀的投递到各个消费端

@Topic和channel的关系 | center

三个模块中nsqd模块最为重要,我们从这个模块开始学习它的源码

入口函数

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
var cfg config
configFile := flagSet.Lookup("config").Value.String()
if configFile != "" {
    _, err := toml.DecodeFile(configFile, &cfg)
    if err != nil {
        log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error())
    }
}
cfg.Validate()

opts := nsqd.NewOptions()
options.Resolve(opts, flagSet, cfg)
nsqd := nsqd.New(opts)

nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
if err != nil {
    log.Fatalf("ERROR: failed to persist metadata - %s", err.Error())
}
nsqd.Main()
<-signalChan
nsqd.Exit()
  1. 首先用 signal.Notify 阻塞系统的 killctrl+c 信号,让进程可以处于deamon的状态运行
  2. 按优先级合并配置文件:命令行 > 配置文件 > 默认值
  3. nsqd.LoadMetadata 读取dat文件,加载topic和channel信息,并同步运行和停止的状态
  4. 将进程的运行状态(topic和channel信息)持久化到dat文件中
  5. 执行 nsqd.Main 直到捕捉退出信号

nsqd.Main 的代码位于 nsqd/nsqd.go

NSQD主函数(TCP监听)

func (n *NSQD) Main() {
    var httpListener net.Listener
    var httpsListener net.Listener

    ctx := &context{n}

    tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.tcpListener = tcpListener
    n.Unlock()
    tcpServer := &tcpServer{ctx: ctx}
    n.waitGroup.Wrap(func() {
        protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
    })
    ...
}

  NSQD首先启动了tcp监听模型,为了保证通用性,在 protocol 包中封装了TCPServer,需要传入 Listener, TCPHandler, Logger 对象。所有的TCP监听均可以用这个模式来创建监听,只要传入对应的 ListenerTCPHandler ,那么Listener在Accept到Connect的时候,将其交给对应TCPHandler.Handle(clientConn) 执行。

TCPHandler 的Interface实现

package protocol

type TCPHandler interface {
    Handle(net.Conn)
}

func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            break
        }

        // 启动Goroutine 去执行Handle函数
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

  这里体现了Go在实现Interface的便捷之处,不需要显示的声明实现了某个Interface,只需要完全的实现Interface中定义的方法,那么就会默认该类型实现了接口。在这里不同的Handler,只要实现了Handle(net.Conn),就可以被当做TCPHandler对象传入。在代码中的体现是:
  执行Handle函数时是启动一个Goroutine来执行的,这里其实是per connect per goroutine,由于Golang的特性,Goroutine在执行时的调度模式是epoll模式,可以很好的利用系统的多核资源。

main函数中TCPServer的实现

type tcpServer struct {
    ctx *context
}

func (p *tcpServer) Handle(clientConn net.Conn) {
    p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr())

    // 客户端应该初始化本身通过发送一个4字节序列表示协议的版本,
    // 这样将允许我们优雅地升级兼容协议
    buf := make([]byte, 4)
    _, err := io.ReadFull(clientConn, buf)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err)
        return
    }
    protocolMagic := string(buf)

    p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'",
        clientConn.RemoteAddr(), protocolMagic)

    var prot protocol.Protocol
    switch protocolMagic {
    case "  V2":
        prot = &protocolV2{ctx: p.ctx} // V2版本的协议操作
    default:
        protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
        clientConn.Close()
        p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'",
            clientConn.RemoteAddr(), protocolMagic)
        return
    }

    err = prot.IOLoop(clientConn)
    if err != nil {
        p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err)
        return
    }
}

  源码中标记了需要在通讯时预留4个字节的版本号信息,用来兼容协议的升级。如果未来有协议升级,只需要在protocolMagic中添加新的case分支就可以了。

NSQD主函数(HTTP/HTTPS监听)

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig)
        if err != nil {
            n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err)
            os.Exit(1)
        }
        n.Lock()
        n.httpsListener = httpsListener
        n.Unlock()
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger)
        })
    }
    httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress)
    if err != nil {
        n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err)
        os.Exit(1)
    }
    n.Lock()
    n.httpListener = httpListener
    n.Unlock()
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger)
    })

  这里不论是http还是https的监听,httpsServerhttpServer作为Handler对象,均在内部声明了路由规则,不同的请求定义了不同的操作,最后通过http_api.Serve()绑定端口监听

NSQD默认自启的操作

    n.waitGroup.Wrap(func() { n.queueScanLoop() }) // 循环消息分发
    n.waitGroup.Wrap(func() { n.idPump() }) // 生产唯一消息id的一个队列
    n.waitGroup.Wrap(func() { n.lookupLoop() }) // 如果nsqd有变化,同步nsqlookup
    if n.getOpts().StatsdAddress != "" {
        // 定时将nsqd的状态以短连接的方式发送至一个状态监护进程.包括了nsqd的应用资源信息,以及nsqd上topic的信息
        n.waitGroup.Wrap(func() { n.statsdLoop() })
    }

  启动监听后,除了通过监听启动的操作外,NSQD还有一些类似守护进程的操作会一直运行,包括:

  • 循环消息分发
  • 生产唯一消息ID
  • nsqlookup的状态同步
  • 状态监控
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • 转载自http://blog.csdn.net/qq295445028/article/details/79930...
    WebSSO阅读 2,917评论 0 3
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 6,574评论 2 39
  • https://nodejs.org/api/documentation.html 工具模块 Assert 测试 ...
    KeKeMars阅读 6,320评论 0 6
  • 记得读大学的时候因为上了一次电影欣赏的选修课,知道了中国第五代,第六代导演,知道了蒙太奇,当时觉得说要把世界上的经...
    生命是一次馈赠的旅行阅读 1,113评论 0 0