Redis 实现 消息队列

消息队列的需求

1.消息保序

虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。

2.处理重复的消息

消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。

3.保证消息可靠性

消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

基于Streams实现

Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。

XADD:插入消息,保证有序,可以自动生成全局唯一 ID;

XREAD:用于读取消息,可以按 ID 读取数据;

XREADGROUP:按消费组形式读取消息;

XPENDING :XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,

XACK:XACK 命令用于向消息队列确认消息处理已完成。

git

实现代码源码
https://gitee.com/gnliscream/redis-mq

引入依赖

<redis-mq.version>1.0-SNAPSHOT</redis-mq.version>

<dependency>
    <groupId>vip.gnloypp</groupId>
    <artifactId>redis-mq-api</artifactId>
    <version>${redis-mq.version}</version>
</dependency>
<dependency>
    <groupId>vip.gnloypp</groupId>
    <artifactId>redis-mq-api-data</artifactId>
    <version>${redis-mq.version}</version>
</dependency>

生产者

配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import vip.gnloypp.redis.mq.api.config.RedisSenderConfig;
import vip.gnloypp.redis.mq.api.config.RedisSenderQueueConfig;
import vip.gnloypp.redis.mq.api.core.manager.RedisSenderManager;

@Configuration
public class EventConfig {

    /**
     * 注入生产者管理器
     */
    @Bean
    public RedisSenderManager redisSenderManager(RedisSenderConfig redisSenderConfig) {
        return new RedisSenderManager(redisSenderConfig);
    }

    /**
     * 注入生产者配置
     * 需要注入自己的RedisConnectionFactory
     */
    @Bean
    public RedisSenderConfig redisSenderConfig(RedisConnectionFactory redisConnectionFactory) {
        return new RedisSenderConfig()
                // redis链接工厂
                .setRedisConnectionFactory(redisConnectionFactory)
                // 环境
                .setEnv("dev")
                // 日志打印的前缀
                .setLogPrefix("【MGMT】")
                // 生产者配置
                .setRedisSenderQueueConfigs(
                        new RedisSenderQueueConfig<SmsEvent>()
                                // 队列唯一标识
                                .setQueue("4d0578da9202441eb8aa2a683416c511")
                                // 消息类
                                .setEventType(SmsEvent.class)
                                // 队列最大长度
                                .setQueueMaxSize(10L)
                );
    }

}

发送示例

import lombok.Getter;
import lombok.Setter;
import vip.gnloypp.redis.mq.api.data.RedisMqMessage;

@Setter
@Getter
public class SmsEvent extends RedisMqMessage {

    /**
     * 手机号
     */
    private String phone;
    
    /**
     * 验证码
     */
    private String code;

}
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.validation.constraints.NotBlank;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import vip.gnloypp.redis.mq.api.core.manager.RedisSenderManager;

@Tag(name = "短信", description = "20241104001")
@Validated
@RestController
@RequestMapping("/admin/sms")
public class SmsController {

    @Autowired
    private RedisSenderManager redisSenderManager;

    @Operation(summary = "发送")
    @GetMapping("/send")
    public void send(@NotBlank String phone) {
        SmsEvent smsEvent = new SmsEvent();
        smsEvent.setPhone(phone);
        smsEvent.setCode(String.valueOf(Math.random()).substring(3, 9));
        redisSenderManager.sendMessage(smsEvent);
    }

}

消费者

配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import vip.gnloypp.redis.mq.api.config.RedisReceiverConfig;
import vip.gnloypp.redis.mq.api.config.RedisReceiverQueueConfig;
import vip.gnloypp.redis.mq.api.core.manager.RedisReceiverManager;

@Configuration
public class EventConfig {

    /**
     * 消费者管理器
     */
    @Bean
    public RedisReceiverManager redisReceiverManager(RedisReceiverConfig redisReceiverConfig) {
        return new RedisReceiverManager(redisReceiverConfig);
    }

    /**
     * 消费者配置
     * 需要注入自己的RedisConnectionFactory
     */
    @Bean
    public RedisReceiverConfig redisReceiverConfig(RedisConnectionFactory redisConnectionFactory,
                                                   SmsHandler smsHandler) {
        return new RedisReceiverConfig()
                // redis链接工厂
                .setRedisConnectionFactory(redisConnectionFactory)
                // 环境
                .setEnv("dev")
                // 日志打印的前缀
                .setLogPrefix("【BASIC】")
                // 消费者配置
                .setRedisReceiverQueueConfigs(
                        new RedisReceiverQueueConfig<SmsEvent>()
                                // 消费者组
                                .setGroup("group1")
                                // 消费者
                                .setConsumer("consumer1")
                                // 队列唯一标识
                                .setQueue("4d0578da9202441eb8aa2a683416c511")
                                // 消息类
                                .setEventType(SmsEvent.class)
                                // 消费处理类
                                .setReceiveHandler(smsHandler)
                                // 每次拉取消息数
                                .setBatchSize(10)
                                // 重试间隔(秒)
                                .setRetryTimeout(30L)
                                // 消息隐藏时间(秒)
                                .setMinIdleTime(20L)
                                // 重试次数
                                .setRetryMaxCount(5L)
                );
    }

}

接收示例

import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import vip.gnloypp.redis.mq.api.core.handler.ReceiveHandler;

@Log4j2
@Component
public class SmsHandler implements ReceiveHandler<SmsEvent> {

    @Autowired
    private SmsClient smsClient;

    @Override
    public boolean receive(SmsEvent message) {
        SendSmsRequest request = new SendSmsRequest();
        request.setPhoneNumbers(message.getPhone());
        request.setCode(message.getCode());
        smsClient.send(request);
        return true;
    }

}

测试

生产者启动

消费者启动

发送消息


接收消息


基于List实现

消息保序解决方案

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。

处理重复的消息解决方案

生产者提供一个消息id,消费者要把已经处理过的消息 ID 号记录下来。当收到一条消息后,消费者程序就可以对比收到的消息 ID 和记录的已处理过的消息 ID,来判断当前收到的消息有没有经过处理。如果已经处理过,那么,消费者程序就不再进行处理了。

保证消息可靠性解决方案

为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。

提高消费消息性能

在生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

消费者实现

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Component
public class RedisQueueTask {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    // @PostConstruct 该注解被用来修饰一个非静态的void方法。该方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
    @PostConstruct
    public void consumeMessage() {
        // 创建一个单线程的执行器
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        // 放入循环消费消息的任务
        executorService.execute(() -> {

            // 存储已处理过的消息id,这里不考虑历史id占用内存大小问题
            HashSet<String> processed = new HashSet<>();

            // 循环出队
            while (true) {
                try {
                    // 读取备份list,存在则先消费备份list
                    HashMap<String, String> map = (HashMap<String, String>) redisTemplate.opsForList()
                            .rightPop("backup-mp");

                    if (map == null) {
                        // 使用BRPOP命令进行阻塞式读取,这里没有读到队列数据时阻塞10s,超时或者读取到数据后,再开始下一轮读取
                        // 同时将map存入备份list中,防止由于故障或宕机而造成消息丢失
                        map = (HashMap<String, String>) redisTemplate.opsForList()
                                .rightPopAndLeftPush("messageQueue", "backup-mp", 10, TimeUnit.SECONDS);
                    }

                    // 无消息时,开始下一轮
                    if (map == null) {
                        continue;
                    }

                    // 重复消息直接丢弃
                    if (processed.contains(map.get("messageId"))) {
                        // 备份list出队
                        redisTemplate.opsForList().rightPop("backup-mp");
                        continue;
                    }

                    // 进行业务处理
                    System.out.println("消费消息:" + JSON.toJSONString(map));

                    // 记录已消费的messageId
                    processed.add(map.get("messageId"));

                    // 备份list出队
                    redisTemplate.opsForList().rightPop("backup-mp");

                } catch (Exception e) {
                    // 异常捕获,防止循环因异常停止
                    e.printStackTrace();
                }
            }
        });
    }

}

生产者实现

import com.alibaba.fastjson.JSON;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.HashMap;
import java.util.List;

@SpringBootTest
public class MyTest {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * redis 发送消息
     */
    @Test
    void test2() {
        for (int i = 0; i < 10; i++) {
            HashMap<String, String> map = new HashMap<>();
            // 生产者设置消息id,用来防止消费者重复消费
            map.put("messageId", String.valueOf(i));
            map.put("userId", "user-id-" + i);

            // 发送消息
            redisTemplate.opsForList().leftPush("messageQueue", map);
            System.out.println("发送消息:" + JSON.toJSONString(map));

            // 制造 messageId 为 6 7 8 9 的重复消息
            if (i > 5) {
                redisTemplate.opsForList().leftPush("messageQueue", map);
                System.out.println("发送消息:" + JSON.toJSONString(map));
            }
        }
    }

}

测试结果

发送消息日志


image

消费消息日志


image
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容

  • Redis实现轻量级的消息队列与消息中间件相比,没有高级特性也没有ACK保证,无法做到数据不重不漏,如果业务简单而...
    JunChow520阅读 39,487评论 3 11
  • 原文链接:Redis实现消息队列的方案 Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。它支持数...
    这个ID狠温柔阅读 101,235评论 2 28
  • 原文作者: xingguang原文链接:https://www.tiance.club/post/40339086...
    一直到最后12阅读 238评论 0 0
  • 久违的晴天,家长会。 家长大会开好到教室时,离放学已经没多少时间了。班主任说已经安排了三个家长分享经验。 放学铃声...
    飘雪儿5阅读 7,523评论 16 22
  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,564评论 0 11