nsq服务端源码探索一

nsq官方文档

项目结构欣赏

image.png
  • 1.通讯分析
    nsq运行起来主要组件有3,分别是nsqlookupd,nsqd,nsqadmin。
    官方文档快速开始:
nsqlookupd    //启动nsqlookupd服务
nsqd --lookupd-tcp-address=127.0.0.1:4160   //启动nsqd服务
nsqadmin --lookupd-http-address=127.0.0.1:4161   //启动nsqadmin服务
curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'  //通过http的方式通讯并生产数据到topic为test中去

思考:为什么三个服务构成一个go的nsq消息服务中间件?
答:分工不同,其实核心是nsqd,所有生产者的消息都去nsqd了,所有消费者要消费的消息也是去nsqd拿的。那不是一个nsqd服务就搞定了?nsqd虽然承载了消息队列的核心,但是其自身为了更好的横向拓展,都是不直接跟外界交流通讯的(虽然也可以直接通讯但不推荐),nsqd会把自己的信息上报到nsqlookupd,意思就是所有的消费者和生产者都不会直接去找nsqd服务,带着自己要找的topic和channel去找nsqlookupd,然后nsqadminupd回复他们你的要找的topic在哪一个nsqd上,也就是返回给nsqd的连接信息,然后消费者或者生产者就拿着连接信息去进行tcp连接了。

  • 2.服务管理包svc
    nsqd用到的 svc 这个包来管理所有服务的,同时也统一了代码风格
    1.program定义好后,我们在init里面初始化自己需要的服务代码
    2.在start里面启动我们的业务服务
    3.stop里面执行当我们的服务被kill的时候触发的执行代码(信号很多,这里只枚举了2种通用的)
func main() {
    prg := &program{}     //program结构体就是需要我们自己去定义,然后该结构体需要实现 start,stop,init等方法
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        logFatal("%s", err)
    }
}

nsq那么多服务,使用svc那么服务管理风格就都一致了,没什么好说的。supervisorctl管理的服务记得配置下子进程终止的时间设置(stopwaitsecs=3600),不然stop函数需要执行1小时,supervisorctl可能就等不了那么久了.

  • 3.code风格三板斧
    nsqd的业务代码都在start里面
    1.配置服务专属的配置对象,分2步,初始化自己的配置对象,然后把这个对象放到命令参数解析的函数中去改造,也就是配置对象有些值是可以被命令传参给覆盖
    opts := nsqd.NewOptions() //初始化自己的配置对象
    flagSet := nsqdFlagSet(opts) //试图使用服务启动的参数的值去改造opts的属性值,从而达到灵活配置效果

2.根据opts去new一个本次的业务对象nsqd,后面所有的业务细节的根就都在nsqd这个对象上了,典型的面向对象思维

  nsqd, err := nsqd.New(opts)//获取nsqd对象
  if err != nil {
      logFatal("failed to instantiate nsqd - %s", err)
  }
  p.nsqd = nsqd

  err = p.nsqd.LoadMetadata()//通过默认dat文件加载元数据,将dat记载的topic加载到内存,并去nsqadminupd那里更新channel的信息,有可能topic是属于别的nsqd上的,这就相当于一个加载最新的过程了
  if err != nil {
      logFatal("failed to load metadata - %s", err)
  }
  err = p.nsqd.PersistMetadata() //将最新的原数据写回到随机临时文件,写成功再mv为dat目标文件,这是一个技巧
  if err != nil {
      logFatal("failed to persist metadata - %s", err)
  }

3.nsq的核心

func (n *NSQD) Main() error {
    ctx := &context{n}

    exitCh := make(chan error)
    var once sync.Once
    exitFunc := func(err error) {
        once.Do(func() {
            if err != nil {
                n.logf(LOG_FATAL, "%s", err)
            }
            exitCh <- err
        })
    }

    n.tcpServer.ctx = ctx
    n.waitGroup.Wrap(func() {
        exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    })

    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    })

    if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
        httpsServer := newHTTPServer(ctx, true, true)
        n.waitGroup.Wrap(func() {
            exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
        })
    }

    n.waitGroup.Wrap(n.queueScanLoop)
    n.waitGroup.Wrap(n.lookupLoop)  //依次跟nsqaminupd进行tcp拨号连接
    if n.getOpts().StatsdAddress != "" {
        n.waitGroup.Wrap(n.statsdLoop)
    }

    err := <-exitCh
    return err
}

1.Main这个函数其实作用就是先后启动http服务,https服务,tcp服务等。要知道每个服务可能就对应N个goroutine,从nsq这里我看到的是,main启动多个服务,main通过一定技巧达到管理这些服务,至于服务内部的细节(groutinue的分布用该服务自己管),也就是main管理的最小单元是服务,像极了操作系统资源分配最小单元进程,执行单元线程的这种意思了。

怎么做到的?
    httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired)
    n.waitGroup.Wrap(func() {
        exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    })
        ······
        ······
    err := <-exitCh

一个服务的启动分二步

  1. 初始化,也就是new的过程
  2. 服务的启动放到 exitFunc 函数里面托管,而exitFunc又放到 Wrap函数里面托管

exitFunc 保证所有的服务只要有一个服务返回的err不为nil,Main函数就执行完毕了。说白了就是如果你对外提供的服务在你内部是N个小服务组装一起的,只有内部全部正常,我们才能继续使用它对外提供服务,同理只要有一个内部小服务不正常,主协程就得监控到(是退出报警,还是其他。。),这个exitFunc就在扮演这个角色,核心点是 所有的子服务自己异常了自协程主动发信号给主协程的通讯过程,跟我们主协程通过context主动通知子协程去通讯有着通讯方向的区别

Wrap 这个函数可以说是结合 exitFunc 使用的,主协程通过 exitFunc 已经知道 了有一个子协程的异常行为,但是其他小服务还是正常做自己的事情的(这也说明解耦得足够彻底),Wrap这个行为是赋予你的code有能力知道所有的子服务是否全部正常运行完毕的能力,这个信号为依据自然可以做一些主动去通知各个自服务关闭这种友好的退出的行为了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354