上一篇 <<<Rabbitmq基础知识
下一篇 >>>Rabbitmq示例之工作(公平)队列
1.特点
默认的传统队列是为均摊消费,存在不公平性;
如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
2.程序代码示例
2.1依赖包
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5 </version>
</dependency>
</dependencies>
2.2 连接类
public class RabitMQConnection {
public static Connection getConnection() throws IOException, TimeoutException {
// 1.创建我们的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置我们的连接地址
connectionFactory.setHost("10.211.55.16");
// 3.设置我们的端口号
connectionFactory.setPort(5672);
// 4.设置账号和密码
connectionFactory.setUsername("jiang");
connectionFactory.setPassword("123456");
// 5.设置VirtualHost
connectionFactory.setVirtualHost("/mytest1205");
return connectionFactory.newConnection();
}
}
2.3 生产者
public class Producer {
private static final String QUEUE_NAME = "test1205";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("生产者启动成功..");
// 1.创建我们的连接
Connection connection = RabitMQConnection.getConnection();
// 2.创建我们通道
Channel channel = connection.createChannel();
for (int i = 0; i < 10; i++) {
String msg = "发送测试内容-" + i;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("生产者发送消息成功:" + msg);
}
channel.close();
connection.close();
}
}
2.4 消费者
public class Consumer {
private static final String QUEUE_NAME = "test1205";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建我们的连接
Connection connection = RabitMQConnection.getConnection();
// 2.创建我们通道
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费消息msg:" + msg);
}
};
// 3.创建我们的监听的消息
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
3.测试效果
消费者开多个时,默认是采用轮询(均摊)机制
推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<基于Netty简单手写消息中间件思路
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之发布订阅模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq示例之RPC模式
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq解决分布式事务demo
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka集群如何实现相互感知
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ如何解决分布式事务
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明