异常处理
RabbitMQ java client中的异常处理
消费消息,在消费消息的时候抛出异常,
消费启动类:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.util.concurrent.TimeUnit;
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
System.out.println(channel.isOpen());
channel.basicConsume("sms",true,new SimpleConsumer(channel));
TimeUnit.SECONDS.sleep(20);
System.out.println(channel.isOpen());
channel.close();
connection.close();
}
}
消费逻辑
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class SimpleConsumer extends DefaultConsumer{
public SimpleConsumer(Channel channel){
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println("-----收到消息了-----------");
System.out.println("消息属性为:"+properties);
System.out.println("消息内容为:"+new String(body));
throw new NullPointerException("空指针异常");
}
}
然后我们发现20s过后,发现控制台打印了
true
amq.ctag-fJ45VlTAV2aKO97-zztDNQ
-----收到消息了-----------
消息属性为:#contentHeader<basic>(content-type=null, content-encoding=null, headers={}, delivery-mode=1, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
消息内容为:第一条消息
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=Closed due to exception from Consumer com.zhihao.test.day10.SimpleConsumer@2462e0fd (amq.ctag-fJ45VlTAV2aKO97-zztDNQ) method handleDelivery for channel AMQChannel(amqp://zhihao.miao@192.168.1.131:5672/,1), class-id=0, method-id=0)
at com.rabbitmq.client.impl.AMQChannel.processShutdownSignal(AMQChannel.java:286)
false
at com.rabbitmq.client.impl.ChannelN.startProcessShutdownSignal(ChannelN.java:282)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:596)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:530)
at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:523)
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.close(AutorecoveringChannel.java:68)
at com.zhihao.test.day10.Consumer.main(Consumer.java:31)
我们发现当消费端抛出异常的时候,channel会关闭,然后channel.close()
会报错。原因是什么呢?
我们知道com.rabbitmq.client.ExceptionHandler
这个接口,中定义了各个阶段异常的捕获方法。其默认实现com.rabbitmq.client.impl.DefaultExceptionHandler
,继承com.rabbitmq.client.impl.StrictExceptionHandler
,发现当消费失败的时候会kill掉channel。
源码如下:
@Override
public void handleConsumerException(Channel channel, Throwable exception,
Consumer consumer, String consumerTag,
String methodName)
{
handleChannelKiller(channel, exception, "Consumer " + consumer
+ " (" + consumerTag + ")"
+ " method " + methodName
+ " for channel " + channel);
}
调用handleChannelKiller
方法
@Override
protected void handleChannelKiller(Channel channel, Throwable exception, String what) {
log(what + " threw an exception for channel " + channel, exception);
try {
channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
} catch (AlreadyClosedException ace) {
// noop
} catch (TimeoutException ace) {
// noop
} catch (IOException ioe) {
log("Failure during close of channel " + channel + " after " + exception, ioe);
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
}
}
消息发送确认异常捕获:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Send {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
connectionFactory.setExceptionHandler(new DefaultExceptionHandler(){
@Override
public void handleConfirmListenerException(Channel channel, Throwable exception) {
System.out.println("=====消息确认发生异常=======");
exception.printStackTrace();
}
});
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("收到消息确认,:"+deliveryTag);
throw new IOException("数据库异常,确认失败");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
}
});
channel.basicPublish("","sms",null,"发送消息".getBytes());
TimeUnit.SECONDS.sleep(20);
channel.close();
connection.close();
}
}
Spring AMQP异常处理
设置AUTO确认的时候已经讲解了异常处理,这边主要讲解一下自动声明的时候的异常处理。
正常的情况下我们声明队列的代码:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public Queue infoQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("info",true,false,false,arguments);
}
@Bean
public Queue errorQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("error",true,false,false,arguments);
}
}
应用启动类:
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
//使得客户端第一次连接rabbitmq
context.getBean(RabbitAdmin.class).getQueueProperties("**");
context.close();
}
}
执行声明成功,但是假如我们的info队列已经存在,并且属性和自动声明的不一致,那么就会抛出异常造成info声明不了,糟糕的是error队列也声明不了。
此时执行发现抛出异常,并且声明不了队列。
控制台打印出异常堆栈信息:
信息: Starting beans in phase -2147482648
九月 29, 2017 11:49:30 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#163e4e87:0/SimpleConnection@ef9296d [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 63190]
九月 29, 2017 11:49:30 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory log
严重: Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
Exception in thread "main" org.springframework.amqp.AmqpIOException: java.io.IOException
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:71)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1461)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387)
at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:500)
at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:419)
at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:33)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:571)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1430)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1387)
at org.springframework.amqp.rabbit.core.RabbitAdmin.getQueueProperties(RabbitAdmin.java:336)
at com.zhihao.miao.exception.Application.main(Application.java:18)
怎样去解决呢?
修改配置类,设置忽略声明异常
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
return factory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//设置忽略声明异常
rabbitAdmin.setIgnoreDeclarationExceptions(true);
return rabbitAdmin;
}
@Bean
public Queue infoQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("info",true,false,false,arguments);
}
@Bean
public Queue errorQueue(){
Map<String,Object> arguments = new HashMap<>();
return new Queue("error",true,false,false,arguments);
}
}
重新启动应用启动类,
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
@ComponentScan
public class Application {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
//使得客户端第一次连接rabbitmq
context.getBean(RabbitAdmin.class).getQueueProperties("**");
context.close();
}
}
此时发现队列error声明成功,info声明失败,控制台打印:
九月 29, 2017 11:51:23 下午 org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh
信息: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Fri Sep 29 23:51:23 CST 2017]; root of context hierarchy
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
九月 29, 2017 11:51:24 下午 org.springframework.context.support.DefaultLifecycleProcessor start
信息: Starting beans in phase -2147482648
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory createBareConnection
信息: Created new connection: connectionFactory#163e4e87:0/SimpleConnection@ef9296d [delegate=amqp://zhihao.miao@192.168.1.131:5672/, localPort= 63244]
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.connection.CachingConnectionFactory log
严重: Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
九月 29, 2017 11:51:24 下午 org.springframework.amqp.rabbit.core.RabbitAdmin logOrRethrowDeclarationException
警告: Failed to declare queue: Queue [name=info, durable=true, autoDelete=false, exclusive=false, arguments={}], continuing... com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'info' in vhost '/': received none but current is the value 'zhihao.topic.exchange' of type 'longstr', class-id=50, method-id=10)
九月 29, 2017 11:51:24 下午 org.springframework.context.annotation.AnnotationConfigApplicationContext doClose
信息: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@446cdf90: startup date [Fri Sep 29 23:51:23 CST 2017]; root of context hierarchy
九月 29, 2017 11:51:24 下午 org.springframework.context.support.DefaultLifecycleProcessor stop
信息: Stopping beans in phase -2147482648
此时也就达到了我们的需求。