spring-rabbitmq测试

一、基本介绍

spring-rabbitmq-思维导图.png

1.1 maven依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

spring-rabbit依赖了amqp-client,同时对其进行了封装,使用CachingConnectionFactory去管理connection,使用spring-retry实现重试机制。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

二、Spring Boot RabbitMQ启动流程

Rabbitmq-starter初始启动流程图.png

spring boot启动时,会执行RabbitAutoConfiguration的初始化,加载rabbitmq所需要的bean,其中CachingConnectionFactory 在初始化的时候,会创建一个connection,尝试与rabbitmq server建立连接,如果server处于异常状态,将出现如下所示的异常:


spring boot rabbitmq connection error.png

如上所示,应用会持续进行connect尝试,直到成功建立连接。如果成功如下所示:


rabbitmq-client启动成功.png

然后,就可以进行正常的消息的生产,消息的消费。

三、生产者

3.1 发送流程

//1 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

//2 设置RabbitMQ相关信息
factory.setHost("127.0.0.1");
factory.setUsername("springcloud");
factory.setPassword("springcloud");
factory.setPort(5672);

//3 创建一个新的连接
Connection connection = factory.newConnection();

//4 创建一个通道
Channel channel = connection.createChannel();

//5 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//6 声明为确认模式
channel.confirmSelect();

//7 发送消息到队列中
for (int i = 0; i < 10; i++) {
    String message = "Hello RabbitMQ, I'm chandler" + i + " !";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
    System.out.println("Producer Send +'" + message + "'");
}

//8 关闭通道和连接
channel.close();
connection.close();

如上代码所示,是一个标准消息生产者的生命周期,根据整理获得如下生命周期流程图:


Rabbitmq发送消息流程.png

Spring-Rabbit将如上流程进行了重新构建,使用 CachingConnectionFactory 实现connection的管理和复用,使用 RetryTemplate 实现消息发送高可用[重试机制],使用 RabbitTemplate 管理消息的发送和接收,发送的核心函数如下所示


RibbatTemplate#doSend.png

对上图两个红框进行追踪,整理出如下Spring-Rabbit发送流程的时序图:


Rabbitmq发送消息-时序图.png

3.2 确认机制

我们使用Confirm的时候,首先要设置Channel为Confirm模式,即向broker端发送Confirm.Select。confirmSelect()代码如下:
Channel主要实现类是 ChannelN

@Override
public Confirm.SelectOk confirmSelect()
    throws IOException
{
    if (nextPublishSeqNo == 0) nextPublishSeqNo = 1;
    return (Confirm.SelectOk)
        exnWrappingRpc(new Confirm.Select(false)).getMethod();
}

如上所示,当Channel开启Confirm模式的时候,nextPublishSeqNo=1,标记第一条publish的序号。如下是client进行push时候的源码:


push.png

如上所示,将publish序号存入一个线程安全的set中进行管理,查看 waitForConfirms(),当超时时间为0时,线程将会一直等待server响应,可以看到如下所示源码:


waitForConfirms.png

RabbitTemplate在设置确认机制时,将自身作为listener作为监听器。在server端接收到消息之后应答反馈“收到了“,不过只是确保了消息发送到queue,并不保证消息最终会被消费。


RabbitTemplate#setupConfirm.png

四、消费者

4.1 消费流程

//获取connection连接
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();

//设置RabbitMQ相关信息
factory.setHost("127.0.0.1");
factory.setUsername("springcloud");
factory.setPassword("springcloud");
factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();

//创建通道,你必须要声明一个消息消息队列,然后向该队列里推送消息
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//在低版本中,new QueueingConsumer();的方式,但是这种方式已经被废弃了,不建议使用
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        String msg = new String(body, "utf-8");
        System.out.println("msg:" + msg);
    }
};

boolean autoAck = true;
channel.basicConsume(QUEUE_NAME, autoAck, defaultConsumer);

如上代码所示,是一个标准消息消费者的生命周期,根据整理获得如下生命周期流程图:

Spring-Rabbit将如上流程进行了重新构建,使用 SimpleMessageListenerContainer 管理 listener 实现消息的接收和处理,使用 RetryTemplate 实现消息消费的高可用[重试机制]。并实现完全的注解化,核心注解有@RabbitListener、@RabbitHandler、@EnableRabbit。如下是spring-Rabbit启动一个消费者监听器的简单示例:

/**
 * 消费者
 */
@Slf4j
@Component
@RabbitListener(queues = "topic.message1")
public class Receiver {

    @RabbitHandler
    public void process(String hello) throws Exception{
        log.info("Receiver : {}", JSON.parseObject(hello, MessageTestContext.class).toString());
    }
}

4.2 消费者容器

SimpleMessageListenerContainer 即简单消息监听器容器:

  • 这个类非常强大,我们可以对它进行很多的设置,用对于消费者的配置项,这个类都可以满足。它 有监听单个或多个队列、自动启动[使用注解]、自动声明功能[使用注解]。
  • 可以设置事务特性、事务管理器、事务属性、事务并发、是否开启事务、回滚消息等。
  • 支持动态设置,设置消费者数量、最小最大数量、批消费;消息确认模式、是否重回队列、异常捕获等等。
    通过对核心类 SimpleMessageListenerContainer 的初始化过程,已经start函数进行梳理,整理出如下时序图:


    Spring-Rabbitmq消息消费-时序图.png
RabbitListenerAnnotationBeanPostProcessor
RabbitListenerAnnotationBeanPostProcessor.png
  1. 根据注解发现监听器,收集信息
  2. 创建 rabbitAdmin 注册到spring容器中,并与 MethodRabbitListenerEndpoint 绑定
  3. 创建 RabbitListenerContainerFactory 注册到spring容器中
SimpleMessageListenerContainer
SimpleMessageListenerContainer.png
  1. RabbitListenerEndpointRegistry 创建消费者容器,并注册spring容器


    消费者容器.png
  2. MessageListenerContainer 的具体实现类是 SimpleMessageListenerContainer

  3. 生命周期被spring-context管理,也可以被AsyncRabbitTemplate 使用

  4. 获取有效的 RabbitAdmin (对rabbitmq-template封装)

  5. 通知唤醒所有消费者容器

  6. 初始化消费者,将封装好的消费者(AsyncMessageProcessingConsumer)放入线程池中

AsyncMessageProcessingConsumer

AsyncMessageProcessingConsumer 内部封装了 BlockingQueueConsumer,BlockingQueueConsumer 是具体的消费者。封装的原因是可以方便的管理和监控消费者,同时能记录消费者生命周期过程中的异常状态。

BlockingQueueConsumer#run

attemptPassiveDeclarations() 声明目标队列
channel.basicQos() 如果没有设置自动确认,从队列中预取目标数量的消息
consumeFromQueue(queueName) 从目标队列中获取消息实体

4.3 Delivery

ACK机制是RabbitMQ中处理数据安全性的另外一种设置。
由于消息在传递过程中无法保证一定到达目的地,需要一种确认机制来对消息是否成功进行确认。
RabbitMQ通过 basic.delivery来给消费者传递消息,每次发送,都会附带一个delivery tag ,就是说每个channel都会有一个特有的delivery tag。ACK 必须在相同的delivery tag的前提条件下进行确认。
receiveAndExecute函数
消费者的消费处理逻辑主要在 SimpleMessageListenerContainer 这个实现类中完成,如下所示,receiveAndExecute 是确认机制实现的入口函数。

最后一个消息到达,开始进行确认逻辑

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Throwable {
    // 判断事务管理器是否存在
   if (this.transactionManager != null) {
      try {
         if (this.transactionTemplate == null) {
             // 如果不存在事务,则新建一个事务管理器,并将当前连接资源和事务绑定
            this.transactionTemplate =
                  new TransactionTemplate(this.transactionManager, this.transactionAttribute);
         }
         return this.transactionTemplate
               .execute(new TransactionCallback<Boolean>() {

                  @Override
                  public Boolean doInTransaction(TransactionStatus status) {
                     RabbitResourceHolder resourceHolder = ConnectionFactoryUtils.bindResourceToTransaction(
                           new RabbitResourceHolder(consumer.getChannel(), false),
                           getConnectionFactory(), true);
                     try {
                         // 在事务管理器内执行
                        return doReceiveAndExecute(consumer);
                     }
                     catch (RuntimeException e) {
                        prepareHolderForRollback(resourceHolder, e);
                        throw e;
                     }
                     catch (Throwable e) { //NOSONAR
                        // ok to catch Throwable here because we re-throw it below
                        throw new WrappedTransactionException(e);
                     }
                  }
               });
      }
      catch (WrappedTransactionException e) {
         throw e.getCause();
      }
   }
    // 若已存在事务,则直接执行
   return doReceiveAndExecute(consumer);

}

可以看到是通过事务机制来实现,当处理过程抛出异常的时候进行回滚,执行 BlockingQueueConsumer#rollbackOnExceptionIfNecessary。如下所示:


doReceiveAndExecute

rollbackOnExceptionIfNecessary函数

如果设置AcknowledgeMode为NONE或者MANUAL,将进入basicReject 函数执行逻辑,
basicReject 函数将向服务端抛deliveryTag ,声明消息未被消费。

public enum AcknowledgeMode {
    NONE,
    MANUAL,
    AUTO;
    
    public boolean isTransactionAllowed() {
       return this == AUTO || this == MANUAL;
    }
    
    public boolean isAutoAck() {
       return this == NONE;
    }
    public boolean isManual() {
       return this == MANUAL;
    }
    
}

五、测试

异常场景服务端模拟:

  • 服务端集群模式:
    • 单节点
    • 集群
  • 服务端节点状态:
  1. 节点宕机
  2. 节点启动
  3. 节点卡顿
  4. 节点恢复
    异常场景客户端模拟:
  • RabbitMQ工作模式
  1. Publish/Subscribe 发布订阅、exchange
  2. Routing 路由
  • 客户端动作
    • 一、topic创建
    • 二、消息发送
    • 三、消息消费
  • 确认模式,自动确认
  1. 生产者确认
  2. 消费者确认
  • 重试机制 有
  1. 重试策略
  2. 生产时异常
  3. 消费时异常
    • Redelivered:BlockingQueueConsumer#rollbackOnExceptionIfNecessary
  • 异常统计:
    • A.超时异常
    • B.心跳异常
    • C.socket中断异常
    • D.connection拒绝
    • E.connection阻塞
    • F.异常堆栈
      序号 场景 客户端动作 CPU memory single cluster 结果
      1 3、5、6、7、8 一、二、三 充沛 充沛 是 只出现异常A,其它异常都未出现
      2 3、6、7、8、 二、三 充沛 不足 是 只出现异常A,其它异常都未出现
      3 1、2、6、7、8 一、二、三 充沛 不足 是 出现D,并可能出现socket写入失败
      4 3、4、5、7、8 一、二、三 不足 充沛 是 异常A,E,各种类型的超时

六、总结

 本次工作的核心内容包含,梳理spring-rabbitmq生产者生产流程和消费者消费过程,测试客户端与服务端交互,如下图所示:

 一种成熟技术一般都经历了好几年时间的洗礼,功能变得不断完善,内部原理与执行逻辑也变得更加复杂。我们作为使用者,很难对其做到完全掌握,每个人的掌握的程度也有浅有深。对spring-rabbit的梳理,可以来带以下几个好处:

  1. 梳理清楚生产者的生产流程和消费者的消费过程,对于理解一些问题的产生带来便利。当未来可能产生的问题,可以降低理解成本,有利于更快的定位问题源。
  2. 梳理整理出的文档可以落实到wiki,一则方便记录以便后续更新完善,二则有利于降低组内成员的理解差差距,方便大家在一个理解层面上沟通。
  3. 技术的利用逐步升华,组件的开发逐步优化,应该有一个更优的方法论持续推进。以下是我个人整理的技术利用的方法论,如果有补充的可以联系我
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,013评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,205评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,370评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,168评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,153评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,954评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,271评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,916评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,382评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,877评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,989评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,624评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,209评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,199评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,418评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,401评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,700评论 2 345

推荐阅读更多精彩内容