上一篇 <<<Rabbitmq示例之工作(公平)队列
下一篇 >>>Rabbitmq示例之路由模式Routing
1.核心思想
一个生产者投递消息,可以被多个不同的队列实现消费;
2.实现原理
多个不同的队列绑定相同交换机,生产者只需要将消息投递到交换机之后,在由交换机将消息转发到所有绑定的队列实现消费。
优点:生产者只针对交换机,交换机可以在控制台绑定或解绑多个队列,代码不用任何的变动,操作便捷。
3.核心代码
/**
* 生产者只需要关联交换机,交换机绑定了哪些队列由控制台设置。代码中定义了关联性,一旦控制台删除了,消费者也会收不到
*/
public class ProducerFanout {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabitMQConnection.getConnection();
Channel channel = connection.createChannel();
String msg = "交换机消息发布了";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
/**
* 邮件消费者
*/
public class MailConsumer {
private static final String QUEUE_NAME = "consumerFanout_email";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者...");
Connection connection = RabitMQConnection.getConnection();
final Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者获取消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
/**
* 短信消费者
*/
public class SmsConsumer {
private static final String QUEUE_NAME = "consumerFanout_sms";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者...");
Connection connection = RabitMQConnection.getConnection();
final Channel channel = connection.createChannel();
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取消息:" + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
3.效果演示
配置上解绑了短信的队列,代码不用任何的改动,就会出现下面的输出了
推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<基于Netty简单手写消息中间件思路
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之点对点简单队列
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq解决分布式事务demo
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka集群如何实现相互感知
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ如何解决分布式事务
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明