基于NATS实现分布式通信模式


在分布式应用中经常需要实现服务间的通信,本文我们使用NATS消息中间件来实现服务间的不同通信方式。

准备工作

首先创建一个Go项目。注意:本文所介绍的例子运行在Linux/MacOS操作系统环境,但NATS也支持windows系统。

go mod init example

安装nats包:

go get  github.com/nats-io/nats.go/@latest

我们将使用以下目录结构:

.
├── cmd
│   ├── publish-subscribe
│   │   └── main.go
│   ├── request-reply
│   │   └── main.go
│   └── queue-groups
│       └── main.go
├── go.mod
└── go.sum

启动本地nats服务:

docker run -d --name nats-main -p 4222:4222 -p 6222:6222 -p 8222:8222 nats

发布订阅模式

发布订阅模式

NATS实现了消息的发布和订阅一对多模式。发布者在一个主题上发送消息,在该主题上的任何订阅者都可以收到消息。这种1:N一对多模式也称为:fan-out。
订阅者还可以在主题中使用通配符,优点类似正则表达式。例如:

  • foo.*可以匹配foo.bar和foo.baz。
  • fo o.*.bar匹配foo.a.bar和foo.b.bar。
  • foo.>匹配上面所有主题。
    消息大小有限制(在nats服务的max_payload配置参数中设置)。默认是1MB,但可以设置最大为64MB。但NATS开发团队推荐最大值设置小点比如8MB。

为什么需要这种通信模式?

发布订阅是很常见的使用场景,可以用于发送消息到不同的服务。

代码

我们在cmd/publish-subscribe/main.go文件中写该模式代码,首先初始化NATS客户端。

nc, err := nats.Connect(nats.DEFAULT_ENCODER)
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

使用3个订阅者订阅foo主题,可以实现一个fan-out模式。

nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

向foo主题发布消息并等待。

if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
        log.Fatal(err)
    }
    time.Sleep(2 * time.Second)

完整例子如下,NATS发布消息非常简单。

package main

import (
    "github.com/nats-io/nats.go"
    "log"
    "time"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

    if err := nc.Publish("foo", []byte("Here's some stuff")); err != nil {
        log.Fatal(err)
    }
    time.Sleep(2 * time.Second)
}

执行结果

如你所见,消息被发送到所有的订阅者。

go run cmd/publish-subscribe/main.go
2022/05/04 12:07:44 Subscribe 3: Here's some stuff
2022/05/04 12:07:44 Subscribe 1: Here's some stuff
2022/05/04 12:07:44 Subscribe 2: Here's some stuff

请求应答模式

请求应答模式

请求应答(Request-Reply)在分布式系统中也是很常见的通信模式。客户端发送一个请求,会在一定时间内异步等待接收应答消息。
NATS使请求-应答变得简单而强大,并支持一些强大的特性,比如位置透明、扩和缩容、可观察性等等。

为什么需要这种模式?

有时服务间需要一对一的通信,请求应答就非常适合。

代码

在cmd/reques-reply/main.go文件中写该模式的代码,还是以初始化NATS客户端代码开始:

nc, err := nats.Connect(nats.DefaultURL)

if err != nil {
    log.Fatalln(err)
}

defer nc.Close()

订阅foo主题,添加一些日志并对接收到消息时提供应答:

nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Request receive:", string(msg.Data))

        msg.Respond([]byte("Here you go"))
    })

我们还可以使用不同的应答主题,客户端可以向只响应特定请求者的服务发出请求,创建1对1的关系。
下面使用NATS客户端的Request方法。包含三个参数:主题、请求内容(字节数组)、请求超时时间。
以下是完整代码:

package main

import (
    "github.com/nats-io/nats.go"
    "log"
    "time"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.Subscribe("foo", func(msg *nats.Msg) {
        log.Println("Request receive:", string(msg.Data))

        msg.Respond([]byte("Here you go"))
    })

    reply, err := nc.Request("foo", []byte("Give me data"), 10*time.Second)
    if err != nil {
        log.Fatal(err)
    }
    log.Println("Got reply:", string(reply.Data))
}

执行结果

正如预期的那样,我们的请求收到了,订阅者用一些数据响应了请求。

$ go run cmd/request-reply/main.go
2022/05/04 12:25:21 Request receive: Give me data
2022/05/04 12:25:21 Got reply: Here you go
队列订阅模式
队列订阅模式

NATS提供了一种称为分布式队列功能内置负载平衡。使用队列订阅者将在一组订阅者之间平衡消息发送,这组订阅者可用于提供应用程序容错和扩展工作负载。

为什么需要这种模式

队列订阅是扩展服务的理想选择。扩展就和运行一个新应用程序一样简单,缩容可以向正在运行的应用发送信号来停止服务。这种灵活性和无需任何配置更改特点使NATS成为一种优秀的服务通信技术,可以与所有平台技术一起使用。NATS的一个重要特性是,队列组由应用程序及其队列订阅者组成,而不是在服务器端配置。

代码

要创建一个订阅队列,订阅者需要注册一个队列名。所有包含相同队列名的订阅者组成一个组。无需任何配置。当发布已注册主题的消息时,NATS服务从订阅组中随机选择一个成员接收消息。尽管队列组有多个订阅者,但每个消息只由一个订阅者消费。

我们在cmd/queue-groups/main.go文件中写代码,和前面例子一样先初始化NATS客户端。

nc, err := nats.Connect(nats.DefaultURL)

if err != nil {
    log.Fatalln(err)
}

defer nc.Close()

接下来创建主题为foo的3个队列订阅者,队列名为:queue.foo

nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

最后,创建一个循环像foo主题发布不同的消息,可以看出订阅者是如何消费消息的。

for i:=1; i <= 3; i++{
        message := fmt.Sprintf("Message %d", i)

        if err := nc.Publish("foo", []byte(message)); err != nil {
            log.Fatal(err)
        }
    }

以下是完整代码:

package main

import (
    "fmt"
    "github.com/nats-io/nats.go"
    "log"
    "time"
)

func main() {
    nc, err := nats.Connect("nats://localhost:4222")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 1:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 2:", string(msg.Data))
    })

    nc.QueueSubscribe("foo", "queue.foo", func(msg *nats.Msg) {
        log.Println("Subscribe 3:", string(msg.Data))
    })

    for i:=1; i <= 3; i++{
        message := fmt.Sprintf("Message %d", i)

        if err := nc.Publish("foo", []byte(message)); err != nil {
            log.Fatal(err)
        }
    }

    time.Sleep(2 * time.Second)
}

执行结果:

可以看到消息被随机地发送到不同的订阅者。因此,在某种程度上,NATS可以作为服务的7层负载均衡器。

$ go run cmd/queue-groups/main.go
2022/05/04 12:46:06 Subscribe 3: Message 1
2022/05/04 12:46:06 Subscribe 1: Message 3
2022/05/04 12:46:06 Subscribe 2: Message 2

总结

在本文中,我们研究了不同的通信模式,展示了NATS的实时分布式消息传递功能。此外,JetStream可以与这些模式结合使用,实现持久化消息传递和消息至少一次消费策略。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容