什么是消息中间件
消息队列中间件(Message Queue Middleware) 简称MQ是指用高效可靠的消息传递机制与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。提供消息传递和消息排队模型,在分布式环境下扩展进程间通信。
目前比较主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。消息中间件适用于需要可靠的数据传送的分布式环境,能够在不同平台间通信,被用来屏蔽各种平台和协议之间的特性,实现应用程序之间的协同,在任何时刻都能将消息进行传送和存储转发。
消息中间件的作用
- 解耦
在项目启动之初预测将来会碰到什么需求是极其困难的,消息中间件在处理过程中插入了一个隐含的、基于数据的接口层,这允许你独立的修改或扩展两边的处理过程。 - 冗余
在有些情况下,处理过程会失败,消息中间件可以把数据进行持久化知道它们被完全处理。 - 扩展性
因为MQ解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的。 - 削峰
消息中间件能够使关键组件支撑访问压力,不会因为突发的超负荷请求而完全崩溃。 - 可恢复性
- 顺序保证
大多数场景下,处理数据的顺序很重要,大部分的消息中间件支持一定程度的顺序性。 - 缓冲
- 异步通信
RabbitMQ的起源
RabbitMQ是用Erlang语言实现的AMQP(Advanced Message Queuing Protocol, 高级消息队列协议)的消息中间件,最初起源于金融系统。
为了打破大型机构对于实时消息通信功能的商业壁垒,同时为了能够让消息在各个队列平台之间共通,JMS(Java Message Service)应运而生。JMS试图通过提供公共Java API的模式,隐藏MQ的实现细节,只要选择合适的MQ驱动即可。ActiveMQ就是JMS的一种实现。不过尝试使用单独标准化接口来胶合众多不同接口,最终会让应用程序变的更加脆弱。
2006年6月,由Cisco、Redhat、iMatix等联合制定了AMQP的公开标准。他是应用层协议的一个开放标准。基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
RabbitMQ有一下几个特性:
- 可靠性
提供持久化、传输确认及发布确认机制 - 灵活的路由
通过交换器来路由消息,对于复杂的路由功能,可以通过绑定多种交换器实现 - 扩展性
多个RabbitMQ可以组成集群,并动态扩展节点 - 高可用性
- 多种协议
除了原生AMQP协议,还支持STOMP、MQTT等多种中间件协议 - 多语言客户端
- 管理界面
- 插件机制
RabbitMQ的安装
- Mac下使用homebrew安装,无需自己搭建erlang环境
brew install rabbitmq
- 启动RabbitMQ
brew services start RabbitMQ
- 页面登录 localhost:15672
账号:guest
密码:guest
配置管理账户
# 切换至rabbitmq的安装目录
cd /usr/local/Cellar/rabbitmq/3.7.15/sbin
# 添加账号
./rabbitmqctl add_user root 123456
# 添加访问权限
./rabbitmqctl set_permissions -p "/" root ".*" ".*" ".*"
# 设置超级权限
./rabbitmqctl set_user_tags root administrator
使用测试
生产者:
package com.pctf.demo;
import com.rabbitmq.client.*;
public class MessageProducer {
public static final String EXCHANGE_NAME = "exchange_demo";
public static final String ROUTING_KEY = "routingkey_demo";
public static final String QUEUE_NAME = "queue_demo";
public static final String HOST = "127.0.0.1";
public static final int PORT = 5672;
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 创建一个持久化的,非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);
// 创建一个持久化的,非排他的,非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 通过路由键绑定交换器与队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
String message = "hello, world";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();
}
}
消费者:
package com.pctf.demo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
public static final String QUEUE_NAME = "queue_demo";
public static final String HOST = "127.0.0.1";
public static final int PORT = 5672;
public static void main(String[] args) throws Exception {
Address[] addresses = new Address[]{
new Address(HOST, PORT)
};
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
Connection connection = connectionFactory.newConnection(addresses);
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("received message is " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}
pom引用:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>