NATS是一个开源的,云原生的消息系统。前面讲过CentOS 7 安装nats server。当NATS作为发布-订阅引擎时,它提供了三种消息传递模式:
- 发布-订阅
- 队列
- 请求-响应
下面简单介绍一下Go中实现这三种消息传递。
一、安装
go get github.com/nats-io/nats.go/
二、导入
import "github.com/nats-io/nats.go"
三、连接Nats服务器
// 直接传入nats服务器端口和地址就可以了
nc, _ := nats.Connect("nats://127.0.0.1:4222")
四、消息传递
订阅者
package main
import (
"fmt"
"github.com/nats-io/nats.go"
"os/signal"
"runtime"
"syscall"
)
func main() {
// 连接Nats服务器
nc, _ := nats.Connect("nats://127.0.0.1:4222")
// 发布-订阅 模式,异步订阅 test1
_, _ = nc.Subscribe("test1", func(m *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})
// 队列 模式,订阅 test2, 队列为queue, test2 发向所有队列,同一队列只有一个能收到消息
_, _ = nc.QueueSubscribe("test2", "queue", func(msg *nats.Msg) {
fmt.Printf("Queue a message: %s\n", string(msg.Data))
})
// 请求-响应, 响应 test3 消息。
_, _ = nc.Subscribe("test3", func(m *nats.Msg) {
fmt.Printf("Reply a message: %s\n", string(m.Data))
_ = nc.Publish(m.Reply, []byte("I can help!!"))
})
// 持续发送不需要关闭
//_ = nc.Drain()
// 关闭连接
//nc.Close()
// 阻止进程结束而收不到消息
signal.Ignore(syscall.SIGHUP)
runtime.Goexit()
}
发布者
import (
"fmt"
"github.com/nats-io/nats.go"
"time"
)
func main() {
// 连接Nats服务器
nc, _ := nats.Connect("nats://127.0.0.1:4222")
// 发布-订阅 模式,向 test1 发布一个 `Hello World` 数据
_ = nc.Publish("test1", []byte("Hello World"))
// 队列 模式,发布是一样的,只是订阅不同,向 test2 发布一个 `Hello zngw` 数据
_ = nc.Publish("test2", []byte("Hello zngw"))
// 请求-响应, 向 test3 发布一个 `help me` 请求数据,设置超时间3秒,如果有多个响应,只接收第一个收到的消息
msg, err := nc.Request("test3", []byte("help me"), 3*time.Second)
if err != nil {
fmt.Println(err)
} else {
fmt.Printf("help answer : %s\n", string(msg.Data))
}
// 持续发送不需要关闭
//_ = nc.Drain()
// 关闭连接
//nc.Close()
}
五、subject 通配符
nats-server 在管理 subject 的时候是通过’.’ 进行分割的,server 底层是使用 tree module 分层管理 subject. 此处有两个通配符*
和>
。
*
可以匹配以.
分割的一切。如:
nc.Subscribe("aa.*.cc", func(m *Msg) {})
可以匹配aa.11.cc
、aa.zngw.cc
,但不能匹配aa.11.zngw.cc
>
需要放在通配符最后,匹配后面所有长度。如:
nc.Subscribe("aa.>", func(m *Msg) {})
,这个匹配所有aa.
开送的subject