一、kafka
1、kafka消息模型总结
发送消息到topic,每个topic可以分成多个Partition,每个Partition对用一个消费者消费,属于无状态消息,Partition每个消息对应唯一的offset,通过zk保存信息,消费端维护offset,持久化属于日志型持久话默认七天删除消息。
2、消费失败处理方案(个人思考)
业务处理异常时,暂不提交offset,利用数据库(关系型或非关系型)保存失败的消息记录,根据失败策略处理相应消息。保存好记录之后可以提交offset。
失败处理可以隔五分钟再往对应的消息队列发送该消息(发送成功就次数+1,将消息id也传入消息队列,方便记录失败次数)复杂情况可能需要记录消息失败的次数,到达一定次数后,改为手工处理
3、保证不丢失消息处理
参考://www.greatytc.com/p/7a6deaba34d2
一般是要求起码设置如下4个参数:
1、给topic设置replication.factor参数:
这个值必须大于1,要求每个partition必须有至少2个副本在kafka服务端
2、设置min.insync.replicas参数:
这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧在producer端
3、设置acks=all:
这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了在producer端
4、设置retries=MAX
(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了我们生产环境就是按照上述要求配置的,这样配置之后,至少在kafka broker端就可以保证在leader所在broker发生故障,进行leader切换时,数据不会丢失
二、RabbitMQ
1、消费失败处理方案
(1)相关配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
retry: #抛异常会按retry策略重发(建议不在程序内抛异常,记录失败消息,然后确认)
enabled: true #允许重发
max-attempts: 5 #重发次数
initial-interval: 30000 #重发间隔时间
acknowledge-mode: manual #不确认宕机重启时会重新消费,确认失败会一直重发
publisher-confirms: true # 如果消息没有到exchange,则confirm回调,ack=false,
# 如果消息到达exchange,则confirm回调,ack=true
publisher-returns: true #exchange到queue成功,则不回调return
#exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
消息手动确认模式的几点说明:
1、监听的方法内部必须使用channel进行消息确认,包括消费成功或消费失败
2、如果不手动确认,也不抛出异常,消息不会自动重新推送(包括其他消费者),因为对于rabbitmq来说始终没有接收到消息消费是否成功的确认,并且Channel是在消费端有缓存的,没有断开连接
3、如果rabbitmq断开,连接后会自动重新推送(不管是网络问题还是宕机)
4、如果消费端应用重启,消息会自动重新推送
5、如果消费端处理消息的时候宕机,消息会自动推给其他的消费者
6、如果监听消息的方法抛出异常,消息会按照listener.retry的配置进行重发,但是重发次数完了之后还抛出异常的话,消息不会重发(也不会重发到其他消费者),只有应用重启后会重新推送。因为retry是消费端内部处理的,包括异常也是内部处理,对于rabbitmq是不知道的(此场景解决方案后面有)
7、spring.rabbitmq.listener.retry配置的重发是在消费端应用内处理的,不是rabbitqq重发
(2)方案描述
参考:https://my.oschina.net/dengfuwei/blog/1595047
消费确认机制改为manual手动确认,在消费方法中try catch中,记录消费失败的消息,然后basicAck确认,通过自定义重试策略取出失败的消息重新消费,失败达到一定次数手动处理
需要注意的 basicAck 方法需要传递两个参数:
(1)deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
(2)multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
2、保证消息不丢失
失败重发参考:https://www.cnblogs.com/xujishou/p/6288623.html
1、设置消息持久化
2、利用confirm模式,发送失败重新放送
(很多帖子说,confirm模式但是confirm回调测试没有消息数据无法重发,建议:https://www.cnblogs.com/xujishou/p/6288623.html)
3、return exchange到队列失败回调,可以获取到消息相关消息可重发
confirm模式 重发消息,生成CorrelationData,重新发送
private CorrelationData getCorrelationData(String exchange, String routeKey, byte[] body) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setReceivedExchange(exchange);
messageProperties.setReceivedRoutingKey(routeKey);
Message message = new Message(body, messageProperties);
CorrelationData correlationData = new CorrelationData();
correlationData.setReturnedMessage(message);
return correlationData;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("confirm消息发送成功:" + cause);
} else {
String msg = new String(correlationData.getReturnedMessage().getBody());
System.out.println("confirm消息发送失败:" + msg);
MessageProperties messageProperties = correlationData.getReturnedMessage().getMessageProperties();
rabbitTemplate.convertAndSend(messageProperties.getReceivedExchange(),
messageProperties.getReceivedRoutingKey(),
correlationData.getReturnedMessage(),
correlationData);
}
}
publisher-confirms: true # 如果消息没有到exchange,则confirm回调,ack=false,
# 如果消息到达exchange,则confirm回调,ack=true
publisher-returns: true #exchange到queue成功,则不回调return
#exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
三、疑问
发送消息时,消息发送成功,业务失败。业务成功消息发送失败?
保证业务处理成功后发送消息,发送失败一直重试发送消息。
四、demo
kafka:https://github.com/huangxiongbiao12/kafka.git
rabbitmq:https://github.com/huangxiongbiao12/rabbitmq-demo.git