- 生产者确认
生产者确认方式分为两种
confirm方式:
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();
//声明一个持久化的 队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//开启confirm模式
channel.confirmSelect();
String msg = "我是confirm模式消息";
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
try {
// 等待回复,如果回复true
if (channel.waitForConfirms()) {
System.out.println("发送成功");
}
else {
System.out.println("发送失败");
}
}
catch (InterruptedException e) {
e.printStackTrace();
System.out.println("发送失败");
}
channel.close();
connection.close();
}
事物方式:
String message = "Hello RabbitMQ:";
try {
channel.txSelect();
for (int i = 0; i < 5; i++) {
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, (message + i).getBytes("UTF-8"));
}
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
以上两种方式,confirm性能比较高,但是需要手工介入,需要自己写逻辑来重新发送,当涉及到多条消息的时候编码也比较麻烦,需要自己定义确认失败的时候的处理逻辑。事物方式编码比较简单,当消息发送失败可回滚,如果发送多条消息,其中一部分失败会把多条消息都回滚。对于我们的项目如果对于发送消息的性能没有特别高的要求,可以采用事物方式。
- 消费者确认
消费者确认或者说消费者应答指的是RabbitMQ需要确认消息到底有没有被收到 。消费者确认也有两种方式,1-自动应答,2-手动应答。自动应答存在缺陷,消息被消费者接收到就应答,但是如果消费者消费消息失败,这时候会造成消息丢失。手动应答我们可以手工控制,在消息被消费者消息完之后再手工给予应答,这样消息不会丢失。
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
Thread.sleep(100000);
当消息消费的时候,需要按照业务进行数据库的更新,这个时候如果数据库更新失败了(也许数据库宕机了),这个时候我们可以将对消息进行手工拒绝。
//requeue=true,表示将消息重新放入到队列中,false:表示直接从队列中删除,此时和basicAck(long deliveryTag, false)的效果一样
void basicReject(long deliveryTag, boolean requeue);