SpringBoot AJAX 长轮询实现简单即时通讯

先来看看需要实现的效果

// 客户端A
> IM.subscribe('频道1')
> IM.subscribe('频道2')
< undefined
订阅 [频道1] 成功

// 客户端B 发送了消息
> IM.send('频道1','hello world')
// 客户端C 短时间内向 [频道1] 和 [频道2] 发送了多条消息 “频道1message1”-"频道1message10",“频道2message1”-"频道2message10"
for(...){IM.send('频道1','....')};for(...){IM.send('频道2','....')}
发送成功

// 客户端A收到消息
[{"text":"hello world"}]
// 客户端A收到消息
[{"text":"频道1message1"},{"text":"频道1message2"}, ...... 
{"text":"频道1message10"},{"text":"频道2message1"}, ...... 
{"text":"频道2message10"}]

//取消订阅 频道2
> IM.unsubscribe('频道2')

需要实现

  1. 订阅频道
  2. 取消订阅
  3. 发送消息
  4. 接收消息

首先,我们需要使用 org.springframework.web.context.request.async.DeferredResult 来做消息的异步返回。
关于DeferredResult的用法,不做详细介绍,可以看看这个简单的例子:

import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.LinkedList;
import java.util.List;

@RestController
@RequestMapping("/test/deferredResult")
public class DeferredResultController {
    //DeferredResult存储在队列中,若同时有多个request1接收到时,request2可以把消息发送给所有的request1请求
    private final List<DeferredResult<String>> queue = new LinkedList<>();

    @RequestMapping("request1")
    public DeferredResult<String> request1(){
        DeferredResult<String> result = new DeferredResult<>(10000L); //设置过期时间 10000ms
        result.onTimeout(()->result.setResult("deferredResult已过期"));
        synchronized (queue){
            queue.add(result);
        }
        return result;
    }

    @RequestMapping("request2")
    public String request2(String message){
        synchronized (queue){
            queue.forEach(deferredResult -> {
                deferredResult.setResult(message);
                queue.remove(deferredResult);
            });
        }
        return "success";
    }

}
  1. 向request1/test/deferredResult/request1发送请求
  2. 向request1/test/deferredResult/request2发送请求,data为{"message":"hello world"}。此时request1、request2分别返回了值"hello world","success"
  3. 若过期时间内没有请求request2 返回消息 "deferredResult已过期"

这就是通过长轮询接收消息的基本方式:发送请求到服务器,服务器收到请求后保持请求,当收到消息时通过此请求返回消息。


定义客户端和频道这两个类:

//客户端
@Data
public class IMClient{
    String id;
    String name;
    long saveTime;
    /**
     * 过期时间 ms,<=0即不过期
     */
    int expire;
}
//频道
@Data
public class Channel {
    String name;
    Set<IMClient> subscriptionSet;
}

我们在客户端通过循环调用的方式-收到返回的结果时再次发送请求,来保证服务器一直手握着客户端的请求,而且对于一个客户端至多只有一个请求,节省资源。


通过jquery实现的代码如下

function poll(){
    $.ajax({
        url:"/im/poll",
        type: "POST",
        success: function (data) {
            console.log(JSON.stringify(data));
            setTimeout(poll,500);//给点延时 减少请求的频率
        },
        error: function (err) {
            console.log(JSON.stringify(err));
            setTimeout(poll,5000);
        }
    });
}

然而 ,在服务器端返回请求后直到收到客户端下一个请求之前的这段时间,服务器并没有维持着客户端的请求。这段时间内发送给服务器的消息将被丢失,因此我们需要一个消息队列来保存未发送的消息:

通过在收到消息和收到轮询请求时都调用flush方法,消息可以尽可能即时地发送给客户端 -- 当DeferredResult可用时,收到消息立刻返回,当DeferredResult不可用时,在收到轮询请求后立刻返回消息。

import lzlz000.entity.CommonMessage;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class IMMessageQueue {
    private DeferredResult<List<CommonMessage>> result;
    //使用LinkedList作为消息队列
    private final LinkedList<CommonMessage> messageQueue = new LinkedList<>();

    public synchronized void send(CommonMessage message){
        messageQueue.add(message);
        flush();
    }
    public DeferredResult<List<CommonMessage>> poll(){
        result = new DeferredResult<>(10000L);
        flush();
        result.onTimeout(()->result.setResult(null));
        return result;
    }
/**
 * flush()方法会在DeferredResult可用(非空且未被使用)时把消息发送出去,在send和poll时都会执行flush(),
 * 这样无论什么情况下消息最终都会被发送出去
 */
    private synchronized void flush(){
        if (result!=null&&!result.hasResult()&&messageQueue.size()>0) {
            //这里需要拷贝一份消息,因为此处为异步调用,而在当前线程中,messageQueue的引用随后将被clear()
            result.setResult(new ArrayList<>(messageQueue));
            messageQueue.clear();
        }
    }
}

接下来就是要维护频道和客户端了,我通过ClientService 和 ChannelService来实现

import lzlz000.entity.im.IMClient;
import org.springframework.stereotype.Service;

import javax.servlet.http.HttpSession;
import java.util.Date;
import java.util.UUID;

@Service
public class ClientService {
    private final int expire = 600 * 1000;//user的过期时间 600s没有活动 频道中的user自动过期

    /**
     * 过期时间间隔
     * @return 过期时间间隔(ms)
     */
    public int getExpire(){
        return expire;
    }
    /**
     * 获取默认过期时间的IMClient 
     * @param session httpSession
     */
    public IMClient getIMClient(HttpSession session){
        return getIMClient(session,this.expire);
    }

    /**
     * 获取指定过期时间的IMClient 
     * @param session httpSession
     * @param expire 过期时间,<=0代表不会过期
     */
    private IMClient getIMClient(HttpSession session, int expire){
        IMClient client =(IMClient)session.getAttribute("imUser");
        if(client != null&&client.getSaveTime()<= new Date().getTime()){//如果过期 则从session中删除用户
            session.removeAttribute("imUser");
            client = null;
        }
        if (client == null) {
            client = new IMClient();//测试时client的作用仅仅是作为map的key所以new一个即可
            client.setId(UUID.randomUUID().toString());
            client.setSaveTime(new Date().getTime()+expire);//设置过期时间
            session.setAttribute("imUser",client);
        }
        client.setExpire(expire);
        return client;
    }

    public boolean isExpired(IMClient client) {
        return client.getExpire() > 0 && client.getSaveTime() <= new Date().getTime();
    }
}

ChannelService 实现订阅、取消订阅、发送、接收的方法

import lzlz000.entity.im.Channel;
import lzlz000.entity.CommonMessage;
import lzlz000.entity.im.IMClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Service
public class ChannelService implements IChannelService{
    private final ClientService clientService;

    //客户端ID与消息队列映射
    private final Map<String,IMMessageQueue> resultMap = new ConcurrentHashMap<>();
    //客户端ID与频道的映射
    private final Map<String,Channel> channelMap = new ConcurrentHashMap<>();

    @Autowired
    public ChannelService(ClientService imUserService) {
        this.clientService = imUserService;
    }

    public void subscribe(String channelName, IMClient client) {
        Channel channel = channelMap.get(channelName);
        if (channel==null) {
            channel = new Channel(channelName,new HashSet<>());
            channelMap.put(channelName,channel);
        }
        channel.getSubscriptionSet().add(client);
    }

    public void unsubscribe(String channelName, IMClient client) {
        Channel channel = channelMap.get(channelName);
        if (channel==null) {
            return;
        }
        Set subscriptionSet = channel.getSubscriptionSet();
        subscriptionSet.remove(client);
        if(subscriptionSet.size()==0){
            channelMap.remove(channelName);
        }
    }

    public void unsubscribe(IMClient client) {
        if (client != null) {
            channelMap.values().forEach(channel -> channel.getSubscriptionSet().remove(client));
        }
    }

    public void emit(String channelName, IMClient sender , CommonMessage data) {
        sender.setSaveTime(new Date().getTime() + clientService.getExpire());//发送消息时候更新savetime
        Channel channel = channelMap.get(channelName);
        if (channel!=null) {
            //当已达到过期时间 删除此client,并且从resultMap中删除对应的消息
            channel.getSubscriptionSet().forEach(imUser -> {
                boolean isExpired = clientService.isExpired(imUser);
                if(isExpired){
                    channel.getSubscriptionSet().remove(imUser);
                    resultMap.remove(imUser.getId());
                }
            });
            channel.getSubscriptionSet().forEach(client-> send(client,data));
            //当channel中没有订阅者,删除此channel
            if( channel.getSubscriptionSet().size()==0){
                channelMap.remove(channelName);
            }
        }

    }


    public DeferredResult<List<CommonMessage>> poll(IMClient receiver){
        IMMessageQueue queue = resultMap.get(receiver.getId());
        if (queue==null) {
            queue = new IMMessageQueue();
            resultMap.put(receiver.getId(),queue);
        }
        return queue.poll();
    }


    private void send(IMClient receiver, CommonMessage message){
        IMMessageQueue queue = resultMap.get(receiver.getId());
        if (queue != null) {
            queue.send(message);
        }
    }
}

Controller

import lzlz000.entity.im.Message;
import lzlz000.service.im.ChannelService;
import lzlz000.service.im.ClientService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;

import javax.servlet.http.HttpServletRequest;
import java.util.List;

/**
 * 长轮询即时通讯Controller
 */
@RestController
@RequestMapping("im")
public class IMController {
    private final ChannelService channelService;
    private final ClientService userService;

    @Autowired
    public IMController(ChannelService channelService, ClientService userService) {
        this.channelService = channelService;
        this.userService = userService;
    }

    //长轮询
    @PostMapping("poll")
    @ResponseBody
    public DeferredResult<List<CommonMessage>> poll(HttpServletRequest req){
        return channelService.poll(userService.getIMClient(req.getSession()));
    }

    //订阅
    @PostMapping("subscribe")
    public String subscribe(HttpServletRequest req,Message channel){
        channelService.subscribe(channel.getChannel(),userService.getIMClient(req.getSession()));//测试时User的作用仅仅是作为map的key所以new一个即可
        return "订阅: "+channel.getChannel();
    }
    //取消订阅
    @PostMapping("unsubscribe")
    public String unsubscribe(HttpServletRequest req,Message channel){
        if (channel != null&&channel.getChannel()!=null) {
            channelService.unsubscribe(channel.getChannel(),userService.getIMClient(req.getSession()));//测试时User的作用仅仅是作为map的key所以new一个即可
            return "取消订阅:"+channel.getChannel();
        }else{
            channelService.unsubscribe(userService.getIMClient(req.getSession()));//测试时User的作用仅仅是作为map的key所以new一个即可
            return "取消订阅全部频道";
        }
    }

    // 似乎识别不了内部的自定义对象CommonMessage,需要HTTP请求设置contentType: "application/json",
    // 把json转化为字符串传给data,并且此方法设置@RequestBody
    @PostMapping("emit")
    public String emit(HttpServletRequest req, @RequestBody Message msg){
        msg.setSender(userService.getIMClient(req.getSession()));
        channelService.emit(msg.getChannel(),msg.getSender(),msg.getMessage());
        return "发送成功";
    }
}

最后把controller提供的方法在封装在js中

/**
 * 长轮询即时通讯
 */
var IM =(function(jQuery){
    var $ =jQuery;
    return {
        poll : function() {
            $.ajax({
                url:"/im/poll",
                type: "POST",
                success: function (data) {
                    console.log(JSON.stringify(data));
                    IM.poll();
                },
                error: function (err) {
                    console.log(JSON.stringify(err));
                    setTimeout(IM.poll,2000);
                }
            });
        },

        subscribe : function(channel) {
            $.post('/im/subscribe',{
                channel:channel
            },function (e) {
                console.log(e);
            })
        },
        unsubscribe : function (channelName) {
            var channel = channelName?{channel:channelName}:null;
            $.post('/im/unsubscribe',channel,function (e) {
                console.log(e);
            })
        },
        send : function(channel,text) {
            $.ajax({
                url:"/im/emit",
                type: "POST",
                data: JSON.stringify({
                    channel:channel,
                    message:{
                        text:text
                    }
                }),
                success: function (e) {
                    console.log(e);
                },
                dataType: "json",
                contentType: "application/json"
            });
        }
    };
})(jQuery);

至此,可实现文章开头所说的效果


这其实就是一种 发布订阅模式
客户端既是订阅者(Subscriber)又是发布者(Publisher)
但是http请求是客户端主动向服务器发送请求获取返回,发布订阅模式是被动的接收被订阅频道的通知。
因此我们只能把客户端的请求存储在服务器中(请求存储在DeferredResult对象中),当服务器数据发生改变时,通知订阅者(返回请求)。


为了保证数据的及时性以及完整性,获取消息-->返回消息-->获取消息 循环执行(即长轮询),而服务器对应频道的对应客户端有一个队列保存着未发送的消息,在收到获取消息的请求或者发送消息的请求时都会尝试返回消息

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,652评论 18 139
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,965评论 6 13
  • 独对孤灯, 我深深想你于心间, 独坐窗前, 静对细雨, 我想你于心田, 独行湖边,面对孤荷, 我想你于腮边, 独立...
    素颜之爱情独角戏阅读 85评论 0 0
  • 对于江南的神往,缘于诗词歌赋……江南好,风景旧曾谙,日出江花红胜火,春来江水绿如蓝,能不忆江南?喜爱郑少秋主演的电...
    严蹊阅读 883评论 6 11
  • 我觉得我肯定被上帝亲吻过,不然何德何能有廖兄这样一个至交? 人生如梦,白云苍狗。时间就像酒杯里的酒,一晃,5年就被...
    小姜同学阅读 362评论 2 1