SpringBoot 与 RabbitMQ

简介

RabbitMQ是一款开源的消息队列中间件, 使用Advanced Message Queuing Protocol (AMQP) 为协议来处理队列消息。由于当下分布式系统越来越普及,消息队列中间件也越来越被得到使用。

和RabbitMQ同类型的消息中间件也有

  • ActiveMQ
  • RocketMQ
  • Kafka

这篇文章暂时不分析几种MQ的比较,主要是写一些SpringBoot与RabbitMQ的使用

Ubuntu 上简单安装 RabbitMQ

首先找一台Linux服务器.

这里我用的是Ubuntu 16.04(ubuntu-xenial)

添加下载信息

> echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
> curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
> apt-get update

安装RabbitMQ

> apt-get install rabbitmq-server

安装完后咋们看一下RabbitMQ 有没有跑起来

> service rabbitmq-server status

如果跑起来的话会显示RabbitMQ的基本信息,没有的话运行

> service rabbitmq-server start

接下来我们需要一个后台的控制页面来管理我们的RabbitMQ服务, RabbitMQ其实自带了一个管理后台,所以直接激活一下就行了

> rabbitmq-plugins enable rabbitmq_management

接下来我们需要创建一个可以登录的用户,刚开始可以创建一个admin管理员,并给这个用户最高权限

> rabbitmqctl add_user [username] [password]
> rabbitmqctl set_user_tags [username] administrator
> rabbitmqctl set_permissions -p "/" [username] ".*" ".*" ".*"

当用户创建完毕后访问 http://[你的服务器IP]:15672 , 然后会出现登录画面,用刚才创建的用户登录即可

RabbitMQ 的简单概念与使用

首先看一下面的模型,简单了来讲就是Publisher将消息发送到Exchange上,然后Exchange通过Routes的健值来发布到Queue里,然后Consumer去Queue里消费消息.

RabbitMQ消息队列模型图

这边有三种常用的Exchange的方式

  • Direct Exchange
  • Topic Exchange
  • Fanout Exchange

下面就对上面三种方式我们来进行一下代码的实现

Direct Exchange

Direct Exchange 是RabbitMQ 默认的交换形式,从图上可以看出, 消息发送方直接将带有Routing Key: green 这个参数发送到Exchange中,然后Exchange再将消息分配到对应的Queue中

Direct Exchange Type

Spring 代码实现如下

Producer 部分

@Component
public class DirectProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "This direct exchange message";
        this.rabbitTemplate.convertAndSend("testqueue" , context);
    }
}

代码中将“testqueue” 做为routing key, exchange会将消息发送到名为“testqueue”的Queue里面

Consumer部分

@Component
@RabbitListener(queues = "testqueue")
public class DirectConsumer {

    @RabbitHandler
    public void consume(String context) {
        System.out.println("Direct Exchange Consumer  : " + context);
    }
}

Consumer对“testqueue” 进行监听然后消费消息。

Topic Exchange

Topic Exchange的模式跟Direct 相似,也需要传routing key,但是转发消息是通过通配符来做的。比如:

  • 路由键必须是一串字符,用句号(.) 隔开,比如说 main.queue

  • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:main.subqueue.*,那么就只能匹配路由键是这样子的:第一个单词是 main,第二个单词是 subqueue。

  • 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是main.subqueue.test.#,那么,以main.subqueue.test.one开头的路由键都是可以的。

Topic Exchange type

首先我们先造3个Consumer分别监听不同的queue,但是这里面监听的queue有绑定不同的通配符规则, 其中Annotation里的key包含了通配符规则

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueA",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "*.topic.A")
)
public class TopicConsumerA {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer A  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueB",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "*.topic.*" )
)
public class TopicConsumerB {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer B  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "topicQueueC",durable = "true"),
                exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                key = "main.topic.*")
)
public class TopicConsumerC {
    @RabbitHandler
    public void consume(String message) {
        System.out.println("Topic Consumer C  : " + message);
    }
}

然后我们通过发送不同routing key来控制那些Consumer能收到消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.A", context);

main.topic.A 这条routing key 应该是满足上面三个通配符规则的,所有三个Consumer 都能收到发送的消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "test.topic.A", context);

test.topic.A 只能满足A和B的通配符规则,即只有ConsumerA和ConsumerB能收到消息

this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.KK", context);

main.topic.KK 只有ConsumerB和C能收到

Fanout Exchange

Fanout Exchange模式是,不管routing key 只要这个queue绑定到fanout exchange 这些queues 都会收到消息,有点像广播的形式.

Fanout Exchange type

接下来我看一下Spring代码的实现部分

@Component
public class FanoutProducer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "This fanout exchange message";
        rabbitTemplate.convertAndSend("testFanoutExchange","", context);
    }
}

Producer将消息发送到"testFanoutExchange"的exchange中,第二个参数因为是Fanout模式所以我们不需要传routing key。

@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "fanoutqueueA",durable = "true"),
                exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerA {

    @RabbitHandler
    public void consume(String message) {
        System.out.println("Fanout Exchange Consumer A  : " + message);
    }
}
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "fanoutqueueB",durable = "true"),
                exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
)
public class FanoutConsumerB {

    @RabbitHandler
    public void consume(String message) {
        System.out.println("Fanout Exchange Consumer B  : " + message);
    }
}

这边consumer中只要将监听的queue绑定到接收消息的"testFanoutExchange" 即可.

这样在fanout这种模式下只要是绑定到指定的exchange上所有的queues都能收到消息

演示代码

以上演示代码可以到 https://github.com/dreamcatchernick/spring-boot-samples 的spring-boot-rabbitmq 目录下载并运行

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,351评论 2 34
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,092评论 3 51
  • 1 RabbitMQ安装部署 这里是ErLang环境的下载地址http://www.erlang.org/down...
    Bobby0322阅读 2,228评论 0 11
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,340评论 0 1