上一篇 <<<Rabbitmq示例之通配符模式Topics
下一篇 >>>Rabbitmq队列模式总结
1.核心思想
- RPC客户端启动后,创建一个匿名、独占的、回调的队列
- RPC客户端设置消息的2个属性:replyTo和correlationId,然后将消息发送到队列rpc_queue
- RPC服务端在队列rpc_queue上等待消息。RPC服务端处理完收到消息后,然后将处理结果封装成消息发送到replyTo指定的队列上,并且此消息带上correlationId(此值为收到消息里的correlationId)
- RPC客户端在队列replyTo上等待消息,当收到消息后,它会判断收到消息的correlationId。如果值和自己之前发送的一样,则这个值就是RPC的处理结果
2.核心代码
/**
* 客户端
* 1、生成临时队列[replyQueueName]用于接收回调数据
* 2、生成唯一标识[corrId]
* 3、将唯一标识和临时队列连同消息内容发送给服务器端的主队列
* 。。。。。阻塞队列监听服务器的消息反馈。。。。。
* 4、如果服务端已反馈,并且和唯一标识一致,则消费
*/
public class RpcClient {
private static final String RPC_QUEUE_NAME = "rpc_test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = RabitMQConnection.getConnection();
final Channel channel = connection.createChannel();
// 定义临时队列,并返回生成的队列名称
String replyQueueName = channel.queueDeclare().getQueue();
//本次请求唯一ID
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
// 唯一标志本次请求
.correlationId(corrId)
// 设置回调队列
.replyTo(replyQueueName)
.build();
String message = "我是来自客户端的请求";
// 发送消息,发送到默认交换机
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
// 阻塞队列,用于存储回调结果
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
// 获取回调的结果
String result = response.take();
System.out.println(" 服务端响应数据:'" + result + "'");
channel.close();
connection.close();
}
}
/**
* 服务端
* 1、使用同步锁机制监听主队列的请求情况
*
* 2、如果有请求,则反馈消息到消费者的临时队列中
*/
public class RpcServer {
private static final String RPC_QUEUE_NAME = "rpc_test";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabitMQConnection.getConnection();
Channel channel = connection.createChannel();
// 设置同时最多只能获取一个消息
channel.basicQos(1);
System.out.println(" [RpcServer] Awaiting RPC requests");
// 定义消息的回调处理类
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 生成返回的结果,关键是设置correlationId值
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
// 生成返回
try {
Thread.sleep(1000 *1);
} catch (InterruptedException e) {
e.printStackTrace();
}
String response = "我是来自服务端的反馈";
// 回复消息,通知已经收到请求
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 对消息进行应答
channel.basicAck(envelope.getDeliveryTag(), false);
// 唤醒正在消费者所有的线程
synchronized (this) {
this.notify();
}
}
};
// 消费消息
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
推荐阅读:
<<<消息中间件的核心思想
<<<消息中间件常见问题汇总
<<<基于Netty简单手写消息中间件思路
<<<消息队列常用名词与中间件对比
<<<Rabbitmq基础知识
<<<Rabbitmq示例之点对点简单队列
<<<Rabbitmq示例之工作(公平)队列
<<<Rabbitmq示例之发布订阅模式
<<<Rabbitmq示例之路由模式Routing
<<<Rabbitmq示例之通配符模式Topics
<<<Rabbitmq队列模式总结
<<<Rabbitmq如何保证消息不丢失
<<<Springboot利用AmqpTemplate整合Rabbitmq
<<<Rabbitmq如何保证幂等性
<<<Rabbitmq的重试策略
<<<Rabbitmq通过死信队列实现过期监听
<<<Rabbitmq解决分布式事务思路
<<<Rabbitmq解决分布式事务demo
<<<Rabbitmq环境安装
<<<Kafka中的专业术语都有哪些
<<<Kafka的设计原理介绍
<<<Kafka集群如何实现相互感知
<<<Kafka如何实现分区及指定分区消费
<<<Kafka如何保证消息顺序消费
<<<Kafka如何保证高吞吐量
<<<Kafka集群环境搭建
<<<RocketMQ架构原理
<<<RocketMQ、RabbitMQ和Kafka的对比
<<<SpringBoot整合RocketMQ示例
<<<RocketMQ保证顺序消费demo
<<<RocketMQ如何动态扩容和缩容
<<<RocketMQ如何解决分布式事务
<<<RocketMQ单机版本安装
<<<RocketMQ集群环境程序启用相关知识点
<<<RocketMQ单机做主备实操
<<<RocketMQ所有配置说明