RabbitMQ整合SpringCloud
你好!欢迎来到Java成长笔记,主要是用于相互交流,相互学习,也希望分享能帮到大家,如有错误之处,希望指正,谢谢!
整合RabbitMQ简介
1、SpringCloudStream定位是兼容主流消息中间件的集成使用,减少不同消息中间件集成的配置,它整合RabbitMQ配置相较简单,不需要定义相应的交换机、队列、以及关系绑定,使相应的配置减少。
2、SpringCloudStream通过上层结构上的处理,使消息生产端、消费端可以多样化,不需要拘泥于生产消费端使用相同的消息中间件。例如:生产端可以使用RabbitMQ,而消费端可以使用Kafka,让开发者省去了相应不同的配置的集成,开发者只需要使用好相应的几个注解,就能实现高性能的生产和消费的场景。
3、当然它也有一个非常大的问题就是不能实现消息的可靠性投递,也就是不能保证消息的100%可靠性,会存在少量的消息丢失。
引入依赖配置
主要依赖
// 指定统一配置
<properties>
<java.version>1.8</java.version>
<spring.cloud-version>Greenwich.SR6</spring.cloud-version>
<spring.boot-version>Brussels-SR17</spring.boot-version>
</properties>
// springcloud依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
// 其它配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
使用注解说明
@Output:输出注解,用于定义发送消息接口
@input:输入注解,用于定义消息的消费者接口
@StreamListener:用于定义监听方法的注解
@EnableBinding:启动绑定关系
消息生产端
生产端配置说明
spring:
cloud:
stream:
binders:
defaultRabbit: # 定义的名称,用于bidding整合
type: rabbit # 指定消息类型
environment:
spring:
rabbitmq: # rabbitmq 配置信息
addresses: 127.0.0.1:5672 # rabbitmq 连接地址
username: rabbitmq # rabbitmq 用户
password: 123456 # rabbitmq 连接密码
virtual-host: / # 虚拟路径
bindings:
userOutPutChannel:
destination: exchange_cloud # Exchange名称,交换模式默认topic,把stream输出通道绑定到exchange_cloud交换机
group: userGroup # 分组名称,生产端、消费端名称需要一致
default-binder: defaultRabbit # 和上面定义的 binders:defaultRabbit需要一致
content-type: application/json # 设置消息类型 为json
生产端代码
展示如下:
// 定义输出类型的通道
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public interface Barista {
final static String OUTPUT_CHANNEL = "userOutPutChannel";
/*
* @Description: 定义一个输出类型的通道
* @Author ly
* @param []
* @return org.springframework.messaging.MessageChannel
* @date 2021/3/22 17:07
*/
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel userOutPutChannel();
}
// 实现类封装
import com.alibaba.fastjson.JSON;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
@EnableBinding(Barista.class)
@Service
@Slf4j
public class RabbitSender {
@Resource
@Output(Barista.OUTPUT_CHANNEL)
private MessageChannel channel;
@Resource
private Barista barista;
/*
* @Description: channel 发送消息
* @Author ly
* @date 2021/3/23 11:12
*/
public void sendMsg (String msg) {
channel.send(MessageBuilder.withPayload(msg).build());
log.error("channel消息发送成功:{} " + msg);
}
/*
* @Description: barista 发送消息
* @Author ly
* @date 2021/3/23 11:12
*/
public void sendMessage (Object message, Map<String, Object> properties) {
final MessageHeaders messageHeaders = new MessageHeaders(properties);
final Message msg = MessageBuilder.createMessage(message, messageHeaders);
final boolean sendResult = barista.userOutPutChannel().send(msg);
log.error("barista消息发送成功:{},sendResult:{} " + JSON.toJSONString(msg), sendResult);
}
}
// 测试类处理
import com.google.common.collect.ImmutableMap;
import com.show.model.User;
import com.show.service.impl.RabbitSender;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ECloudProducerApplicationTests.class)
@ComponentScan(basePackages = {"com.show.*"})
@Slf4j
public class ECloudProducerApplicationTests {
@Resource
private RabbitSender rabbitSender;
@Test
public void sendMessage () {
final String message = "Hello RabbitMQ";
rabbitSender.sendMsg(message);
}
@Test
public void sendRabbitMessage () {
final Map<String, Object> properties = ImmutableMap.of("cloud-stream", "cloud-stream");
final User user = new User("simon","123456", 22, new BigDecimal(100));
rabbitSender.sendMessage(user, properties);
}
}
消费端代码
消费端配置说明
spring:
cloud:
stream:
binders:
defaultRabbit: # 此配置为rabbitmq配置说明
type: rabbit
environment:
spring:
rabbitmq:
addresses: 127.0.0.1:5672
username: rabbit
password: 123456
virtual-host: /
bindings:
userInChannel: # 定义输入管道名称
destination: exchange_cloud # 交换模式默认是topic,把stream的消息输出通道绑定到exchange_cloud交换器
group: userGroup # 分组名称与生产端名称一致
content-type: application/json # 消费端消息类型 json
default-binder: defaultRabbit # 与binders:defaultRabbit名称一致 环境名称
consumer:
concurrency: 1 # 默认监听数量
rabbit:
bindings:
userInChannel: # 和bindings:userInChannel 名称一致
consumer: # 消费端配置
requeue-reject: false # 是否支持return
acknowledge-mode: MANUAL # 签收模式 手动签收
recovery-interval: 3000 # 3s重新连接
durable-subscription: true # 是否启动持久化订阅
max-concurrency: 5 # 最大监听数量
消费端代码
// 定义一个输入通道
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface Barista {
String INPUT_CHANNEL = "userInChannel";
/*
* @Description: 定义一个输入类型的通道
* @Author ly
* @param []
* @return org.springframework.messaging.SubscribableChannel
* @date 2021/3/22 17:34
*/
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel userInChannel();
}
// 定义输入通道监听类
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@EnableBinding(Barista.class)
@Service
@Slf4j
public class MQReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
log.error("消费完毕:{}, Object:{}", System.currentTimeMillis(), JSON.toJSON(message));
channel.basicAck(deliveryTag, false);
}
}
接收消息返回信息示例
对象消息返回信息示例
本章完结,后续还会持续更新,分享Java成长笔记,希望我们能一起成长。如果你觉得我的分享有用,记得点赞和关注哦!这对我是最好的鼓励。谢谢!
PS:转载请注明出处!