1 架构原理
1.1 应用场景
只支持发布订阅模式。
大数据量的消息堆积能力,最终数据是持久化到磁盘上,理论上无限大。
用于业务消息的收发,日志消息的收发请用kafka生态。
1.2 namesrv
namesrv节点是无状态的,用于对整个rocketmq集群的状态保持,包括多少主节点、从节点、topic路由(topic分布在哪些节点上),周期性的健康检查等等。
与broker节点保持长连接。
心跳:间隔30s,心跳超时时间10s,这两个参数无法改变。
一旦连接断开,namesrv会立即感知,但不会立马通知生产者,消费者。需要生产者消费者自身定时获取最新的rocketmq元数据信息
1.3 broker
实际收发消息的节点。
broker节点分主节点(brokerid=0)、从节点(brokerid=其他)。简单理解,从节点是主节点的拷贝,复制。
具体如何复制,有两种集群模式异步复制、同步双写。
异步复制:从节点从主节点异步复制commitlog,所以存在小概率的数据丢失风险
同步双写:为了解决异步复制存在的缺陷,同步双写模式是生产者同步往主、从节点写数据,从而保证主从节点的数据是完全一致的。但性能略低于异步复制。适用于高可靠性场景,例如电商
1.4 生产者、消费者
生产者负责生产消息,发消息。
消费者者负责订阅消息,消费消息。
生产者连接到namesrv,定时同步,获取rocketmq集群元数据。
与broker建立长连接,broker每隔10秒,检查连接,如果超过2分钟没有心跳,关闭连接。
消费模式:广播消费、集群消费。广播消费是指每个消费者都会收到同样的消息。集群消费模式则是按照一定分配策略,每个消费者负责消费不同的队列。
1.5 消息队列
如果每个topic由管理员手动在cluster创建,会在每个broker上默认创建8个队列。可以认为队列是一个无线长度的数组,offset是下标,消费者通过offset来访问消息。
问题:队列数量怎么设定,会对吞吐量造成影响吗?
2 数据持久化过程
2.1 发消息
producer-》broker
broker是一个java程序,数据最先到jvm堆上-》页缓存(物理内存,堆外内存)-》本地磁盘
2.2 收消息
NIO 零拷贝机制,无需拷贝到堆内存。
先从页缓存中找,如果找不到再从本地磁盘load
2.3 消息清理
默认:broker会定期删除过期的消息,默认有效期3天,每天凌晨4点执行这个操作。
3天前的消息会从队列尾部删除。
可在broker配置文件中修改
3 如何保证高可用
3.1 影响可靠性的几种情况
broker不可用、宿主机宕机、宿主机损坏、磁盘损坏等
主节点完全损坏数据不可恢复的情况下,怎么处理?
同步双写模式(v4.2),slave节点无法自动进行故障转移,此时就要线上新增主节点、从节点,等3天后,就可以将发生故障节点的从节点下线。
3.2 负载均衡
topic要保证分配到所有broker上,这里可以利用多个broker的负载均衡能力。默认为每个topic创建8个队列。假设有3个主broker,就有24个队列。
RocketMQ用一个叫ClientID的概念,来唯一标记一个客户端实例,一个客户端实例对于Broker而言会开辟一个Netty的客户端实例。 而ClientID是由ClientIP+InstanceName构成
发消息时,默认是采用轮询的策略往每个可用的队列发。
如果要发生顺序消息。则在发送的时候,可以根据业务ID,固定选择一个队列。
顺序消息存在的缺陷:当指定队列所在的broker不可用时,由于队列总数发生变化,hash取模后的定位队列发生变化,会导致短暂的乱序。
当一个消费者组里上线下线节点的时候,怎么重新分配队列呢?
在消费者触发
1、每个客户端在启动时会去namesrv上获取元数据,topic所在的broker、有多少队列
2、获取该组里的其他节点,知道组里的其他兄弟姐妹
3、除非rebalance,重新分配各自负责的队列,
节点下线怎么办?什么时候触发?触发条件都有哪些?
1、20s定时reblance
2、所有consumer收到broker的consume变化通知,例如上线、下线
3、每次client启动时。
心跳间隔要短些,1秒 2秒。这里心跳超时就很关键了,不能太长,也不能过于短。
broker多久会把已经下线的consume踢掉呢?前面讲到是2分钟,太滞后了?
实际测试情况,版本号v4.3:
起一个生产者线程;2个consumer,中途停掉一个consumer,查看另一个consumer是否已经接手所有topic 队列。
默认配置下consumer上下线很快就能触发reblance,这个配置并不是2分钟。所以放心使用吧,不用担心failover太长。
3.3 failover故障转移能力
slave节点挂掉,无影响;
master挂掉的时候,该节点负责的topic的写能力会受影响,读没有影响。slave无法自动升级为master节点,也没这个必要,简单粗暴。
3.4 重试机制
可以在producer初始化的时候配置多样的重试策略,可向同一个broker发起多次重试,broker失败后重试其他broker。初始化配置如下:
```
DefaultMQProducer producer =new DefaultMQProducer(group);
producer.setNamesrvAddr(nameServer);
producer.setInstanceName(instanceName);
producer.setCompressMsgBodyOverHowmuch(compressMsgBodyOverHowmuch);
producer.setMaxMessageSize(maxMessageSize);
producer.setRetryAnotherBrokerWhenNotStoreOK(retryAnotherBrokerWhenNotStoreOk);
producer.setRetryTimesWhenSendAsyncFailed(retryTimesWhenSendAsyncFailed);
producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
// 使用故障延迟机制,会对获取的MQ进行可用性验证
producer.setSendLatencyFaultEnable(true);
producer.setSendMsgTimeout(sendMsgTimeout);
//设置到broker的心跳
producer.setHeartbeatBrokerInterval(3000);
//从namesrv获取topic路由
producer.setPollNameServerInterval(3000);
```
3.5 一些健康检查的时间
生产者与消费者会延迟(默认30s心跳),发往该broker的消息失败,此时要使用重试机制。
心跳间隔可以在producer或consumer初始化的时候指定heartbeat参数。
4 最佳实践
4.1 配置优化
autoCreateTopicEnable=false 生产环境禁用自动创建topic ,由管理员手工在cluster的每个节点上创建,why?
storePathRootDir=/rocketmq/store 设置消息内容的根目录,如果是docker容器化,则需要把此路径外挂到外部存储
storePathCommitLog=/rocketmq/store/commitlog 设置commitlog日志的根目录
,如果是docker容器化,则需要把此路径外挂到外部存储
maxMessageSize 500000 500k
sendMessageThreadPoolNums=128 # 发消息线程池 ,
pullMessageThreadPoolNums=128 拉消息线程池
useReentrantLockWhenPutMessage=true#否则报flow control异常
4.2 DOCKER容器化 部署
集群容器化部署请参考:
https://github.com/wuzuquan/docker-file/tree/master/rocketmq
注意RocketMQ只有一种模式,即发布订阅模式。
1、多master slave模式,异步复制 ,当master宕机时,会丢失极少量的消息
2、多master slave模式,同步双写,性能稍低,电商场景推荐使用此模式。
docker build -f dockerfile-broker -t 11.4.76.193/redis/rocketmq-broker:4.3.0 .
docker build -f dockerfile-namesrv -t 11.4.76.193/redis/rocketmq-namesvr:4.3.0 .
编写docker-compose,编排 部署。
在docker环境下的坑: host模式下, brokerip1异常,往namesrv注册的ip地址变为docker虚拟网络的网关ip。发消息会返回:SLAVE_NOT_AVAILABLE
云生产环境,只能使用托管模式,host模式slave复制失效
rocketmq消息日志:
conf目录下有几个logback*.xml文件,将文件中的${user.home}/logs/rocketmqlogs/broker_default.log修改为目标日志路径即可
否则默认在root目录下,一会就撑满,或者把日志级别调高
测试,在namesrv节点上:
添加环境变量:export NAMESRV_ADDR=localhost:9876
测试生产者:sh tools.sh org.apache.rocketmq.example.quickstart.Producer
测试消费者:sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
4.3 客户端
客户端配置:
heartbeatBrokerInterval 5000,默认30s,向broker发生心跳
pollNameServerInterval 3000,轮询namesrv的时间
4.4 生产者发消息
1、一个应用使用一个topic,topic由管理员创建
2、设置keys字段,用于定位排查。keys要保持唯一性
3、无论发生成功还是失败,打印消息操作日志 :sendresult、key
4、对于不可丢失的消息,要有重试机制,可在producer初始化时统一配置
5、对于可靠性要求不高的应用,可以用oneway方式异步发生
6、顺序消息:简单的讲,每个topic都有N个队列,保证把消息发生到同一个队列就是顺序消息了。但是顺序消息无法利用failover特性,当队列所在的master挂掉,如果主备切换有一小段时间不可用,顺序消息会发送失败。
顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列 总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方 式比较合适
4.5 消费者
1、消费过程要做到幂等性(消费端去重机制)
根据keys字段进行去重,消费过的可以存入redis或db中
2、批量方式消费
consumer.setConsumeMessageBatchMaxSize(50);
consumer.setConsumeThreadMin(2);
consumer.setConsumeThreadMax(5);
3、必须要有消费者组,由管理员后台创建
每个组有自己独立的offset,可以独立的消费同一个topic
4、从哪里开始消费?
comsumefromwhere配置指定:
CONSUME_FROM_LAST_OFFSET 从上一次消费开始
CONSUME_FROM_FIRST_OFFSET ,从头开始消费
CONSUME_FROM_TIMESTAMP ,从指定时间开始
6 v4.3新特性:事务消息
在之前版本中,事务消息曾经被阉割过,应该是不成熟。4.3版本又重新推出此功能乃一大利好,而且不依赖于其他外部组件。
什么叫事务消息:简单的讲,业务逻辑与发消息两个操作,要么同时成功,要么同时失败。
对于电商交易过程中消息强一致性需求,可以采用此种消息模式。
也可以借助事务消息实现弱一致性的分布式事务,尤其是上游业务无需关注下游业务是否成功,来决定是否需要回滚的场景。
事务消息代码示例参考官方github,或这里:
https://github.com/wuzuquan/microservice/blob/master/core/src/test/java/com/xmair/core/TransactionMQTest.java
事务消息源码分析:https://blog.csdn.net/prestigeding/article/details/81263833
prepare主题消息:RMQ_SYS_TRANS_HALF_TOPIC
commit or rollback:RMQ_SYS_TRANS_OP_HALF_TOPIC(对client不可见)
事务消息发生过程:
1、发送方:先往half队列发prepare消息
2、发送方:执行本地事务
3、发送方:如果commit,就发生commit消息,下发给订阅者;如果rollback就发生rollback消息,删除prepare,不下发。
4、rocketmq:如果没有接收到任何信息,可能超时啦,出了各种异常,咋办?回查事务状态,有可能发送方实例已经宕机,需要回查同一个生产者组的其他实例来获取状态。具体怎么获取?参考rocketmq的事务消息示例代码即可。
5、consumer段消息成功机制保障
一些配置:
检测频率在哪里设置,在broker.conf中设置transactionCheckInterval,太长了,如果要结合分布式事务,不可能给与这么长的超时时间,1-2秒较合适。
检测的时候只会对超时的prepare消息检测,超时时间在哪里设置?
transactionTimeOut,默认3秒,根据具体情况设定。
重试几次?transactionCheckMax,如果超过这个检测次数,丢弃消息,rollback。默认是5次,太多了。2-3次即可。这里有个难以解决的问题,假设producer业务已操作成功,但是所有producer都死了,没法回查,消息被超时rollback。导致下游业务接收不到消息。
6 从哪里读取消息
总结下主从切换流程:
1、当Master和Slave都正常的情况下,默认从Master处读取消息,若开启了slaveReadEnable ,且Master处积压了超过40%内存总量的未消费消息,那么会从Slave=1的Broker处读取消息。
2、当Master宕机时,长时间未向Namesrv发送心跳,Namesrv清空下线的BrokerData,Consumer从Namesrv获取的TopicRouteData里已经没有了Master的BrokerData,因此Consumer从自身的brokerAddr集合里找不到Master的BrokerAddr了,因此就按顺序向第一位Slave发送消息拉取请求。默认配置下slaveReadEnable = false,因此Slave在从CommitLog读取到消息后,设置其suggest brokerId = 0 ,也就是建议其下次从Master处读取消息。
3、在Master为恢复前,都是读取不到其brokerAddr的,因此每次都是从Slave处拉取消息,虽然其每次都建议Consumer从Master处读取。
4、当Master恢复了,从新向Namesrv发送了心跳,注册了Broker信息。Consumer获取的最新TopicRouteData里包含了Master的brokerAddr,那么下次就会听从建议从Master处读取消息,从Slave切换回Master。
7 linux内核优化
http://rocketmq.apache.org/docs/system-config/
#sysctl.conf
echo vm.max_map_count=655360 >> /etc/sysctl.conf
echo vm.overcommit_memory=1 >> /etc/sysctl.conf
echo net.core.somaxconn= 16384 >> /etc/sysctl.conf
echo vm.swappiness=0 >> /etc/sysctl.conf
echo vm.min_free_kbytes=5000 >>/etc/sysctl.conf
8 运维监控
安装官方console工具
9 常用命令
cd /ROCKETMQ/bin
创建topic:
sh mqadmin updateTopic -c DefaultCluster -n localhost:9876 -t testtopic
查看topic列表:
sh mqadmin topicList -n 11.4.74.48:9876
查看指定topic路由信息:
sh mqadmin topicRoute -n 11.4.74.48:9876 -t testtopic1
查看指定topic状态:
sh mqadmin topicStatus -n 11.4.74.48:9876 -t testtopic1
查看指定topic的消息:
sh mqadmin queryMsgByOffset -b broker-b -i 1 -o 0 -t NewUpt_PsgEventTopic -n localhost:9876
创建消费者组:
sh mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g push-consumer-group1
查看消费者组:
sh mqadmin consumerProgress -n localhost:9876
查看集群信息:
sh mqadmin clusterList -n 11.4.74.48:9876
查看broker状态
sh mqadmin brokerStatus -b 11.4.74.44:10911 -n 11.4.74.48:9876
创建生产者组:
创建订阅组:
sh mqadmin updateSubGroup -c DefaultCluster -g subgroup1 -n 11.4.74.48:9876果是对集群扩容,则可以通过指定新的broker地址在扩容的机器上创建一份新的订阅组信息
根据key查消息:
sh mqadmin queryMsgByKey -k test100 -t testtopic1 -n 11.4.74.48:9876
根据unique key查:(非msgid )
sh mqadmin queryMsgByUniqueKey -i 0B0D308C16C418B4AAC236B77FFB03E3 -n 11.4.74.48:9876 -t testtopic1