消息队列是应用中常用的一个技术点,通常我们可以借助消息队列中间件来实现,但是并不是所有的情况下,都需要使用到MQ。
- 如果只需要实现简单的消息队列,那么借助Redis即可。
- 如果对消息有着严格的可靠性等要求,那么建议使用专业的MQ.(RocketMQ,Kafka,RabbitMQ)‘
Redis实现延迟消息队列的思想
可以借助zset有序集合来实现延迟消息队列。因为zset有一个score,它是可以按这个score来进行排序的,我们可以把时间戳作为zset的score,让它按时间去排序,然后在Java程序中使用轮询或者定时任务来消费里面的消息。这里使用的是点对点的消费模式。
以下是代码的展示.
第一步,先创建一个发送消息的对象
package com.xjm.redis;
public class RedisMessage {
private String id;
private Object message;
@Override
public String toString() {
return "RedisMessage{" +
"id='" + id + '\'' +
", message=" + message +
'}';
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Object getMessage() {
return message;
}
public void setMessage(Object message) {
this.message = message;
}
}
第二步,引入序列化工具jackson
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.3</version>
</dependency>
第三步,编写一个消息队列的工具类,主要包含消息入队和消费功能
package com.xjm.redis;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
import java.util.Date;
import java.util.Set;
import java.util.UUID;
/**
* 延迟消息队列
*/
public class DelayMsgQueue {
private Jedis jedis;
private String queue;
public DelayMsgQueue(Jedis jedis, String queue) {
this.jedis = jedis;
this.queue = queue;
}
/**
* 消息入队,要发送的消息
* @param message
*/
public void queue(Object message){
RedisMessage redisMessage = new RedisMessage();
redisMessage.setId(UUID.randomUUID().toString());
redisMessage.setMessage(message);
try {
//序列化
String s = new ObjectMapper().writeValueAsString(redisMessage);
System.out.println("Redis发送消息:"+new Date());
//消息发送,score延迟5秒
jedis.zadd(queue,System.currentTimeMillis()+5000,s);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 消息出队(消息消费)
*
*/
public void loop(){
/**
* 轮询,线程被中断时停止
*/
while (!Thread.interrupted()){
//读取score时间在0到当前时间戳的之间的消息,一次一条
Set<String> message = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
if (message.isEmpty()){
try {
//如果消息为空,则线程休眠一段时间
Thread.sleep(500);
} catch (InterruptedException e) {
break;
}
continue;
}
//如果读取到了消息,则直接加载
String next = message.iterator().next();
if(jedis.zrem(queue,next)>0){
//抢到了,接下来处理业务
try {
RedisMessage redisMessage = new ObjectMapper().readValue(next, RedisMessage.class);
System.out.println("抢到了!"+new Date());
System.out.println(redisMessage.toString());
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
}
第四步,测试
package com.xjm.redis;
public class DelayMsgTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.exeute(jedis -> {
//构造一个消息队列
DelayMsgQueue delayMsgQueue = new DelayMsgQueue(jedis, "jaymin-delay-queue");
Runnable producer = new Runnable() {
@Override
public void run() {
for (int i=0;i<5;i++){
delayMsgQueue.queue("Java>>>>>"+i);
}
}
};
Thread producerThread = new Thread(producer);
Runnable customer = new Runnable() {
@Override
public void run() {
delayMsgQueue.loop();
}
};
Thread customerThread = new Thread(customer);
producerThread.start();
customerThread.start();
try {
//休息7秒后,停止程序
Thread.sleep(7000);
customerThread.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
测试结果如下:
在此鸣谢@江南一点雨,感谢他的redis视频解析.