rabbitmq原理和应用

0.1、索引

https://blog.waterflow.link/articles/1663772504649

RabbitMQ 是一个轻量级且易于部署的消息队列。它支持开箱即用的多种消息传递协议。我们将使用 AMQP(高级消息队列协议)

1、概念

[图片上传失败...(image-25cc4b-1666743473091)]

既然是消息队列,顾名思义,肯定会有生产者生产消息,消费者消费消息,还会有队列用来保存消息,等等。

我们先来看下这些概念:

  • Producer: 将消息推送到rabbitmq交换机的应用
  • Consumer: 从队列读取消息并处理他们的应用
  • Exchange: 交换机负责在Binding和Routing key的帮助下,将消息路由到不同的队列。从上图可以看出rabbitmq有多种类型的交换机
  • Binding: Binding是队列和交换机之间的链接
  • Routing key: 交换机用来决定如何将消息路由到队列的键。可以看做是消息的地址
  • Queue: 存储消息的缓冲区
  • Connection:生产者到Broker(rabbitmq服务),消费者到Broker的连接
  • Channel:为了复用一个连接,一个connection下可以有多个channel,可以把connection理解成电线,channel就是电线里面的铜丝。

消息传递的完整流程是这样的:

  1. 生产者初始化一个到rabbitmq服务的连接
  2. 获取连接的管道,通过管道声明一个交换机
  3. 通过管道声明一个队列,通过绑定的路由键将队列和交换机绑定(发送消息的时候声明一个队列并绑定交换机,消息会进到队列里。如果不声明也可以放到消费者去声明队列和绑定交换机。需要注意的是生产者没有声明队列的话,此时已经生产多条消息,然后去开启消费者消费,是不会消费到之前的消息的)
  4. 通过管道发送消息到指定的交换机
  5. 消费者初始化一个到rabbitmq服务的连接
  6. 获取连接的管道,通过管道声明一个队列
  7. 通过绑定的路由键将队列和交换机绑定
  8. 从队列中消费消息

交换机类型:

  1. direct:直接指定到某个队列
  2. topic:发布订阅模式,一个交换机可以对应多个队列,通过路由规则匹配
  3. fanout:顾名思义,无脑广播模式

2、示例

生产者:

package main

import (
    "fmt"
    "time"

    "github.com/streadway/amqp"
)

var (
    conn    *amqp.Connection
    channel *amqp.Channel
    queue   amqp.Queue
    mymsg   = "Hello HaiCoder"
    err     error

    confirms chan amqp.Confirmation
)

func main() {
    // 建立连接
    conn, err = amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
    if err != nil {
        fmt.Println(err)
        return
    }

    defer conn.Close()

    // 创建channel
    if channel, err = conn.Channel(); err != nil {
        fmt.Println(err)
        return
    }

  // 声明交换机
    err = channel.ExchangeDeclare("liutest", amqp.ExchangeDirect, false, false, false, false, nil)
    if err != nil {
        fmt.Println("ExchangeDeclare Err =", err)
        return
    }

    // 创建队列
    if queue, err = channel.QueueDeclare("liutest", false, false, false, false, nil); err != nil {
        fmt.Println("QueueDeclare Err =", err)
        return
    }

  // 队列和交换机绑定
    err = channel.QueueBind(queue.Name, "queueroutekey", "liutest", false, nil)
    if err != nil {
        fmt.Println("QueueBind Err =", err)
        return
    }

    channel.Confirm(false)
    confirms = channel.NotifyPublish(make(chan amqp.Confirmation, 1))
    //发送数据
    go func() {
        for {
            if err = channel.Publish("liutest", "queueroutekey", false, false, amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(mymsg),
            }); err != nil {
                fmt.Println("Publish Err =", err)
                return
            }
            fmt.Println("Send msg ok, msg =", mymsg)
            time.Sleep(time.Second * 5)
        }
    }()

    go func() {
        for confirm := range confirms {
            if confirm.Ack {
                fmt.Printf("confirmed delivery with delivery tag: %d \n", confirm.DeliveryTag)
            } else {
                fmt.Printf("confirmed delivery of delivery tag: %d \n", confirm.DeliveryTag)
            }
        }
    }()

    select {}

}

消费者:

package main

import (
    "fmt"

    "github.com/streadway/amqp"
)

var (
    conn    *amqp.Connection
    channel *amqp.Channel
    queue   amqp.Queue
    err     error
    msgs    <-chan amqp.Delivery
)

func main() {
    // 建立连接
    conn, err = amqp.Dial("amqp://guest:guest@127.0.0.1:5672/")
    if err != nil {
        fmt.Println(err)
        return
    }

    defer conn.Close()

    // 创建channel
    if channel, err = conn.Channel(); err != nil {
        fmt.Println(err)
        return
    }

    // 创建队列
    if queue, err = channel.QueueDeclare("liutest", false, false, false, false, nil); err != nil {
        fmt.Println("QueueDeclare Err =", err)
        return
    }

    err = channel.QueueBind("liutest", "queueroutekey", "liutest", false, nil)
    if err != nil {
        fmt.Println("QueueBind Err =", err)
        return
    }
    //读取数据
    if msgs, err = channel.Consume(queue.Name, "", false, false, false, false, nil); err != nil {
        fmt.Println("Consume Err =", err)
        return
    }
    go func() {
        for msg := range msgs {
            fmt.Println("Receive Msg =", string(msg.Body))
            msg.Ack(false)
        }
    }()

    select {}

}

3、消息可靠性

[图片上传失败...(image-1405be-1666743473091)]

生产者可靠性

// 将通道设置为确认模式
func (ch *Channel) Confirm(noWait bool) error {
    if err := ch.call(
        &confirmSelect{Nowait: noWait},
        &confirmSelectOk{},
    ); err != nil {
        return err
    }

    ch.confirmM.Lock()
    ch.confirming = true
    ch.confirmM.Unlock()

    return nil
}
// 用于接受服务端的确认响应
func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
    ch.notifyM.Lock()
    defer ch.notifyM.Unlock()

    if ch.noNotify {
        close(confirm)
    } else {
        ch.confirms.Listen(confirm)
    }

    return confirm

}

Confirm 将此通道置为确认模式,以便生产者可以确保服务端已成功接收所有消息。进入该模式后,服务端将发送一个basic.ack或basic.nack消息,其中deliver tag设置为一个基于1的增量索引(用来标识消息的唯一性),对应于该方法返回后收到的每次ack。

在 Channel.NotifyPublish上监听以响应ack。如果未调用 Channel.NotifyPublish,则ack将被忽略。

ack的顺序不受投递消息顺序的约束。

Ack 和 Nack 确认将在未来的某个时间到达。

在通知任何 Channel.NotifyReturn 侦听器后,立即确认不可路由的mandatory或immediate消息。当所有应该将消息路由到它们的队列都已收到传递确认或已将消息加入队列时,其他消息将被确认,必要时将消息持久化。

注:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,那么broker会调用basic.return方法将消息返还给生产者;当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,mandatory标志告诉broker代理服务器至少将消息route到一个队列中,否则就将消息return给发送者;

当 noWait 为真时,客户端不会等待响应。如果服务端不支持此方法,则可能会发生通道异常。

具体代码实现如下:

...

// 设置消息确认
channel.Confirm(false)
confirms = channel.NotifyPublish(make(chan amqp.Confirmation, 1))

...

go func() {
        for confirm := range confirms {
            if confirm.Ack { // 消息已确认
                fmt.Printf("confirmed delivery with delivery tag: %d \n", confirm.DeliveryTag)
            } else { // 未确认的消息可以重新发送
                fmt.Printf("failed confirmed delivery of delivery tag: %d \n", confirm.DeliveryTag)
            }
        }
    }()

...

消费者可靠性

// 将autoAck设置为false
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
    // When we return from ch.call, there may be a delivery already for the
    // consumer that hasn't been added to the consumer hash yet.  Because of
    // this, we never rely on the server picking a consumer tag for us.

    if err := args.Validate(); err != nil {
        return nil, err
    }

    if consumer == "" {
        consumer = uniqueConsumerTag()
    }

    req := &basicConsume{
        Queue:       queue,
        ConsumerTag: consumer,
        NoLocal:     noLocal,
        NoAck:       autoAck,
        Exclusive:   exclusive,
        NoWait:      noWait,
        Arguments:   args,
    }
    res := &basicConsumeOk{}

    deliveries := make(chan Delivery)

    ch.consumers.add(consumer, deliveries)

    if err := ch.call(req, res); err != nil {
        ch.consumers.cancel(consumer)
        return nil, err
    }

    return (<-chan Delivery)(deliveries), nil
}

立即开始消费排队的消息。

在 Connection 或 Channel 上的任何其他操作之前开始接收返回的 chan Delivery。

消息会继续往返回的 chan Delivery 传递,直到发生 Channel.Cancel、Connection.Close、Channel.Close 或 AMQP 异常。消费者必须在 chan 范围内确保收到所有消息。未收到的消息将阻塞同一连接上的所有方法。

AMQP 中的所有消息都必须得到确认。消费者在成功处理消息后最好手动调用 Delivery.Ack。如果消费者被取消或通道或连接被关闭,任何未确认的消息将在同一队列的末尾重新入队

消费者由一个字符串标识,该字符串是唯一的,适用于该channal上的所有消费者。如果希望最终取消消费者,请在 Channel.Cancel 中使用相同的非空标识符。空字符串将导致重新成唯一标识。消费者身份将包含在 ConsumerTag 字段中的每个消息中

当 autoAck(也称为 noAck)为真时,服务器将在将消息写入网络之前向该消费者确认确认。当 autoAck 为真时,消费者不应调用 Delivery.Ack。<u>自动确认消息意味着如果服务器投递消息后消费者无法处理某些消息,则可能会丢失某些消息</u>。

当exclusive 为true 时,服务器将确保这是该队列中的唯一消费者。当exclusive 为false 时,服务器将在多个消费者之间公平地分发消息。 RabbitMQ 不支持 noLocal 标志。建议对 Channel.Publish 和 Channel.Consume 使用单独的连接,以免在发布时 TCP 回推影响消费消息的能力,因此这里主要是为了完整性。当 noWait 为 true 时,不要等待服务器确认请求并立即开始消费。如果无法消费,则会引发通道异常并关闭通道。

消费消息时,将autoAck设置为false

func (d Delivery) Ack(multiple bool) error {
    if d.Acknowledger == nil {
        return errDeliveryNotInitialized
    }
    return d.Acknowledger.Ack(d.DeliveryTag, multiple)
}

客户端消费到消息后,需要调用ack确认接收到消息

AMQP 中的所有消息的投递都必须得到确认。如果使用 autoAck true 调用 Channel.Consume,那么服务端将自动确认每条消息,但是不应该调用此方法,因为这个不能保证消费端业务处理成功。所以,必须在成功处理消息后调用 Delivery.Ack。当multiple 为真时,此消息和同一通道上所有先前未确认的消息将被确认,这对于消息的批处理很有用(但是有个弊端就是,如果有一个出错了,所有批处理的数据都需要重发)。对于每个未自动确认的消息,都必须调用 Delivery.Ack、Delivery.Reject 或 Delivery.Nack

消费端的确认机制的实现:

...

//读取数据
    if msgs, err = channel.Consume(queue.Name, "", false, false, false, false, nil); err != nil {
        fmt.Println("Consume Err =", err)
        return
    }
    go func() {
        for msg := range msgs {
            fmt.Println("Receive Msg =", string(msg.Body))
      // 确认消息
            msg.Ack(false)
        }
    }()
...
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容