消息队列的需求
1.消息保序
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
2.处理重复的消息
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
3.保证消息可靠性
消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
基于Streams实现
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。
XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
XREAD:用于读取消息,可以按 ID 读取数据;
XREADGROUP:按消费组形式读取消息;
XPENDING :XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,
XACK:XACK 命令用于向消息队列确认消息处理已完成。
git
实现代码源码
https://gitee.com/gnliscream/redis-mq
引入依赖
<redis-mq.version>1.0-SNAPSHOT</redis-mq.version>
<dependency>
<groupId>vip.gnloypp</groupId>
<artifactId>redis-mq-api</artifactId>
<version>${redis-mq.version}</version>
</dependency>
<dependency>
<groupId>vip.gnloypp</groupId>
<artifactId>redis-mq-api-data</artifactId>
<version>${redis-mq.version}</version>
</dependency>
生产者
配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import vip.gnloypp.redis.mq.api.config.RedisSenderConfig;
import vip.gnloypp.redis.mq.api.config.RedisSenderQueueConfig;
import vip.gnloypp.redis.mq.api.core.manager.RedisSenderManager;
@Configuration
public class EventConfig {
/**
* 注入生产者管理器
*/
@Bean
public RedisSenderManager redisSenderManager(RedisSenderConfig redisSenderConfig) {
return new RedisSenderManager(redisSenderConfig);
}
/**
* 注入生产者配置
* 需要注入自己的RedisConnectionFactory
*/
@Bean
public RedisSenderConfig redisSenderConfig(RedisConnectionFactory redisConnectionFactory) {
return new RedisSenderConfig()
// redis链接工厂
.setRedisConnectionFactory(redisConnectionFactory)
// 环境
.setEnv("dev")
// 日志打印的前缀
.setLogPrefix("【MGMT】")
// 生产者配置
.setRedisSenderQueueConfigs(
new RedisSenderQueueConfig<SmsEvent>()
// 队列唯一标识
.setQueue("4d0578da9202441eb8aa2a683416c511")
// 消息类
.setEventType(SmsEvent.class)
// 队列最大长度
.setQueueMaxSize(10L)
);
}
}
发送示例
import lombok.Getter;
import lombok.Setter;
import vip.gnloypp.redis.mq.api.data.RedisMqMessage;
@Setter
@Getter
public class SmsEvent extends RedisMqMessage {
/**
* 手机号
*/
private String phone;
/**
* 验证码
*/
private String code;
}
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.validation.constraints.NotBlank;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import vip.gnloypp.redis.mq.api.core.manager.RedisSenderManager;
@Tag(name = "短信", description = "20241104001")
@Validated
@RestController
@RequestMapping("/admin/sms")
public class SmsController {
@Autowired
private RedisSenderManager redisSenderManager;
@Operation(summary = "发送")
@GetMapping("/send")
public void send(@NotBlank String phone) {
SmsEvent smsEvent = new SmsEvent();
smsEvent.setPhone(phone);
smsEvent.setCode(String.valueOf(Math.random()).substring(3, 9));
redisSenderManager.sendMessage(smsEvent);
}
}
消费者
配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import vip.gnloypp.redis.mq.api.config.RedisReceiverConfig;
import vip.gnloypp.redis.mq.api.config.RedisReceiverQueueConfig;
import vip.gnloypp.redis.mq.api.core.manager.RedisReceiverManager;
@Configuration
public class EventConfig {
/**
* 消费者管理器
*/
@Bean
public RedisReceiverManager redisReceiverManager(RedisReceiverConfig redisReceiverConfig) {
return new RedisReceiverManager(redisReceiverConfig);
}
/**
* 消费者配置
* 需要注入自己的RedisConnectionFactory
*/
@Bean
public RedisReceiverConfig redisReceiverConfig(RedisConnectionFactory redisConnectionFactory,
SmsHandler smsHandler) {
return new RedisReceiverConfig()
// redis链接工厂
.setRedisConnectionFactory(redisConnectionFactory)
// 环境
.setEnv("dev")
// 日志打印的前缀
.setLogPrefix("【BASIC】")
// 消费者配置
.setRedisReceiverQueueConfigs(
new RedisReceiverQueueConfig<SmsEvent>()
// 消费者组
.setGroup("group1")
// 消费者
.setConsumer("consumer1")
// 队列唯一标识
.setQueue("4d0578da9202441eb8aa2a683416c511")
// 消息类
.setEventType(SmsEvent.class)
// 消费处理类
.setReceiveHandler(smsHandler)
// 每次拉取消息数
.setBatchSize(10)
// 重试间隔(秒)
.setRetryTimeout(30L)
// 消息隐藏时间(秒)
.setMinIdleTime(20L)
// 重试次数
.setRetryMaxCount(5L)
);
}
}
接收示例
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import vip.gnloypp.redis.mq.api.core.handler.ReceiveHandler;
@Log4j2
@Component
public class SmsHandler implements ReceiveHandler<SmsEvent> {
@Autowired
private SmsClient smsClient;
@Override
public boolean receive(SmsEvent message) {
SendSmsRequest request = new SendSmsRequest();
request.setPhoneNumbers(message.getPhone());
request.setCode(message.getCode());
smsClient.send(request);
return true;
}
}
测试
生产者启动
消费者启动
发送消息
接收消息
基于List实现
消息保序解决方案
List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。
处理重复的消息解决方案
生产者提供一个消息id,消费者要把已经处理过的消息 ID 号记录下来。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。
保证消息可靠性解决方案
为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。
提高消费消息性能
在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。
消费者实现
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Component
public class RedisQueueTask {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// @PostConstruct 该注解被用来修饰一个非静态的void方法。该方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
@PostConstruct
public void consumeMessage() {
// 创建一个单线程的执行器
ExecutorService executorService = Executors.newSingleThreadExecutor();
// 放入循环消费消息的任务
executorService.execute(() -> {
// 存储已处理过的消息id,这里不考虑历史id占用内存大小问题
HashSet<String> processed = new HashSet<>();
// 循环出队
while (true) {
try {
// 读取备份list,存在则先消费备份list
HashMap<String, String> map = (HashMap<String, String>) redisTemplate.opsForList()
.rightPop("backup-mp");
if (map == null) {
// 使用BRPOP命令进行阻塞式读取,这里没有读到队列数据时阻塞10s,超时或者读取到数据后,再开始下一轮读取
// 同时将map存入备份list中,防止由于故障或宕机而造成消息丢失
map = (HashMap<String, String>) redisTemplate.opsForList()
.rightPopAndLeftPush("messageQueue", "backup-mp", 10, TimeUnit.SECONDS);
}
// 无消息时,开始下一轮
if (map == null) {
continue;
}
// 重复消息直接丢弃
if (processed.contains(map.get("messageId"))) {
// 备份list出队
redisTemplate.opsForList().rightPop("backup-mp");
continue;
}
// 进行业务处理
System.out.println("消费消息:" + JSON.toJSONString(map));
// 记录已消费的messageId
processed.add(map.get("messageId"));
// 备份list出队
redisTemplate.opsForList().rightPop("backup-mp");
} catch (Exception e) {
// 异常捕获,防止循环因异常停止
e.printStackTrace();
}
}
});
}
}
生产者实现
import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.HashMap;
import java.util.List;
@SpringBootTest
public class MyTest {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/**
* redis 发送消息
*/
@Test
void test2() {
for (int i = 0; i < 10; i++) {
HashMap<String, String> map = new HashMap<>();
// 生产者设置消息id,用来防止消费者重复消费
map.put("messageId", String.valueOf(i));
map.put("userId", "user-id-" + i);
// 发送消息
redisTemplate.opsForList().leftPush("messageQueue", map);
System.out.println("发送消息:" + JSON.toJSONString(map));
// 制造 messageId 为 6 7 8 9 的重复消息
if (i > 5) {
redisTemplate.opsForList().leftPush("messageQueue", map);
System.out.println("发送消息:" + JSON.toJSONString(map));
}
}
}
}
测试结果
发送消息日志
消费消息日志