介绍:
NSQ是Go语言编写的,开源的分布式消息队列中间件,具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,同时支持横向扩展,操作友好,是一个成熟的、已在大规模生成环境下应用的产品。
NSQ主要分为四部分组件:
nsqlookupd
:守护进程,用来管理拓扑信息,并提供最终一致性的发现服务。nsqd节点广播了topic和channel信息,客户端可以通过查询nsqlookupd来发现topic-producer-nsqd关系。服务启动后,默认监听了两个端口,一个tcp端口(4160)被nsqd用于广播,一个http端口(4161)被客户端用于发现和管理操作。
nsqd
:守护进程,用来接收、排队和投递消息给客户端。服务启动后,默认监听了两个tcp端口,一个(4150)用来服务客户端,一个(4151)用来提供http端口,此外还可以选择性地监听一个可选的https端口。
nsqadmin
:提供一套WEB界面来管理NSQ集群,查看集群的实时统计情况。
utilities
:基础工具集。
QUICK START:
1.启动nsqlookupd
:
$ nsqlookupd
2.启动nsqd
:
$ nsqd --lookupd-tcp-address=127.0.0.1:4160
3.启动nsqadmin
:
$ nsqadmin --lookupd-http-address=127.0.0.1:4161
4.发布一条信息(如果是第一条,会在集群中创建topic):
$ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
5.输出文件到/tmp
目录下:
$ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
如果这一步报错找不到nsqd,原因在于没有指定--broadcast-address,Mac会默认--broadcast-address=“”。我们需要自己指定--broadcast-address,在本机运行时可以直接=127.0.0.1.但是,如果你的nsqd和lookup是不在同一台机器上,你需要设置成你nsqd现在运行的机器的ip地址,这样等你访问admin的时候才能成功。
添加-broadcast-address参数重新启动nsqd:
nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1
6.继续发布消息:
$ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
$ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
7.浏览器打开网址 http://127.0.0.1:4171/ 查看nsqadmin
界面和相关统计信息,查看之前操作的结果是否符合预期。同时,检查写入/tmp
目录下的日志文件(test.*.log
)中的内容。
关系
- nqslookupd 监听tcp 4160,http 4161
- nqsd 监听tcp 4150,http 4151,连接nqslookupd(tcp 4160)
- nsqadmin 监听http 4171,连接nqslookupd(http 4161)
- producer 连接nqsd(tcp 4150/http 4151)
- consumer 连接nqslookupd(http 4161),连接nqsd(tcp 4150)
使用
在go代码中带入包
import "github.com/nsqio/go-nsq"`
创建生产者并发布消息,要连接nqsd并指定topic
producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
err := producer.Publish("topic1", []byte(fmt.Sprintf("test")))
创建消费者,添加处理消息方法,连接nqslookupd(或nqsd)
consumer, err := nsq.NewConsumer("topic1", "channel1", nsq.NewConfig())
consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
fmt.Println("handle msg:", string(msg.Body))
return nil
}))
err := consumer.ConnectToNSQLookupd("127.0.0.1:4161")
//err := consumer.ConnectToNSQD("127.0.0.1:4150")
参考文章: