背景
Logback 配置了 AmqpAppender 用于将日志输入 RabbitMQ :
<appender name="TRACE-RABBITMQ" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern>
{
"level": "%level",
"service": "${springAppName:-}",
"time": "%d{yyyy-MM-dd HH:mm:ss}",
"ip": "%X{ip:-}",
"trace": "%X{traceUuid:-}",
"message": "%message"
}
</pattern>
</layout>
<!--rabbitmq地址 -->
<host>${RABBITMQ_HOST}</host>
<port>${RABBITMQ_PORT}</port>
<username>${RABBITMQ_USERNAME}</username>
<password>${RABBITMQ_PASSWORD}</password>
<declareExchange>true</declareExchange>
<exchangeType>direct</exchangeType>
<exchangeName>xxx.log</exchangeName>
<routingKeyPattern>trace</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>true</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<autoDelete>false</autoDelete>
</appender>
Logstash 配置了从 RabbitMQ 读取消息:
input {
rabbitmq{
host => "192.168.xx.xx:5672"
vhost => "/"
user => "xxx"
password => "xxx"
exchange => "xxx.log"
exchange_type => "direct"
key => "trace"
durable => true
codec => json
ack => true
}
}
在 RabbitMQ 中创建 xxx.log 交换机,direct 类型;再创建 log_queue 的队列并绑定到 xxx.log,路由键为 trace。
现象
- 消息能发送到 log_queue,Logstash 也能正常收到消息并将消息导入 ElasticSearch,ES 中能看到具体的消息。
- 但是 log_queue 队列中的消息一直在增加,从来不减少。。。
分析
走了很多弯路,最后回到 MQ 的文档,慢慢看核心概念,最终反应过来,消费者只需要指定队列就可以了。
解决
结合logstash基于RabbitMQ的输入配置 文章,我们在 LogStash 的配置中只需要指定队列名就行了:
input {
rabbitmq{
host => "192.168.xx.xx:5672"
vhost => "/"
user => "xxx"
password => "xxx"
queue => "log_queue"
durable => true
codec => json
ack => true
}
}
之所以之前 LogStash 还是能收到消息,猜测是它直连了 Exchange,所以从 Exchange 中获取到了消息,而同时消息还要进入 log_queue,由于没有任何消费者连接 log_queue,故该队列中的消息会一直积压。