欢迎来借鉴分布式WebSocket解决方案

单体Webscoket

  • springboot版本: 2.1.1.RELEASE
  • jdk: 1.8

示例代码

  • WebsocketServer
@ServerEndpoint("/client/{userName}")
@Component
@Slf4j
public class WebSocketServer {

    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 接收userId
     */
    private String userName = "";

    /**
     * @Description: 连接建立成功调用的方法,成功建立之后,将用户的userName 存储到redis
     * @params: [session, userId]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userName") String userName) {
        this.session = session;
        this.userName = userName;
        webSocketMap.put(userName, this);
        addOnlineCount();
        log.info("用户连接:" + userName + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * @Description: 连接关闭调用的方法
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(userName)) {
            webSocketMap.remove(userName);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:" + userName + ",当前在线人数为:" + getOnlineCount());
    }


    /**
     * @Description: 收到客户端消息后调用的方法, 调用API接口 发送消息到
     * @params: [message, session]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnMessage
    public void onMessage(String message, @PathParam("userName") String userName) {
        log.info("用户消息:" + userName + ",报文:" + message);
        if (StringUtils.isNotBlank(message)) {
            try {
                //解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("sender", this.userName);
                String receiver = jsonObject.getString("receiver");
                //传送给对应toUserId用户的websocket
                if (StringUtils.isNotBlank(receiver) && webSocketMap.containsKey(receiver)) {
                    webSocketMap.get(receiver).session.getBasicRemote().sendText(jsonObject.toJSONString());
                } else {
                    log.error("用户:" + receiver + "不在该服务器上");
                    //否则不在这个服务器上,发送到mysql或者redis
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 发布websocket消息
     * 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
     *
     * @param dto
     * @return
     */
    public static void sendWebsocketMessage(ChatMsg dto) {
        if (dto != null) {
            if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
                String json = JSON.toJSONString(dto);
                try {
                    webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
                } catch (IOException e) {
                    log.error("消息发送异常:{}", e.toString());
                }
            } else {
                log.error("用户:" + dto.getReceiver() + ",不在线!");
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:" + this.userName + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * @Description: 获取在线人数
     * @params: []
     * @return: int
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * @Description: 在线人数+1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * @Description: 在线人数-1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}
  • WebSocketConfig
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}
  • 前端代码
var socket;
    var userName;
    establishConnection()
    /***建立连接*/
    function establishConnection() {
        userName = $("#sender").val();
        if (userName == '' || userName == null) {
            alert("请输入发送者");
            return;
        }
        //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
        var socketUrl = "" + window.location.protocol + "//" + window.location.host + "/client/" + userName;
        socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
        if (socket != null) {
            socket.close();
            socket = null;
        }
        socket = new WebSocket(socketUrl);
        //打开事件
        socket.onopen = function () {
            console.log("开始建立链接....")
        };
        //关闭事件
        socket.onclose = function () {
            console.log("websocket已关闭");
        };
        //发生了错误事件
        socket.onerror = function () {
            console.log("websocket发生了错误");
        };
        /**
         * 接收消息
         * @param msg
         */
        socket.onmessage = function (msg) {
            msg = JSON.parse(msg.data);
            console.log(msg);
            if (msg.msg != '连接成功') {
                $("#msgDiv").append('<p class="other">用户名:' + msg.sender + '</p><p class="chat">' + msg.msg + '</p>');
            }
        };
    }
    /**
     * 发送消息
     */
    function sendMessage() {
        var msg = $("#msg").val();
        if (msg == '' || msg == null) {
            alert("消息内容不能为空");
            return;
        }
        var receiver = $("#receiver").val();
        if (receiver == '' || receiver == null) {
            alert("接收人不能为空");
            return;
        }
        var msgObj = {
            "receiver": receiver,
            "msg": msg
        };
        $("#msgDiv").append('<p class="user">用户名:' + userName + '</p><p class="chat">' + msg + '</p>');
        try{
            socket.send(JSON.stringify(msgObj));
            $("#msg").val('');
        }catch (e) {
            alert("服务器内部错误");
        }
    }
  • 测试效果


    image.png
  • 问题
    如果两个客户端连接不在同一个服务器上,会出现什么问题?
    结果就是如下所示:
    image.png

如何解决多台客户端连接在不同服务器,互相发送消息问题!

分布式WebSocket 解决

方案一 Redis消息订阅与发布

image.png

描述:
客户端A 和客户端B 都订阅同一个Topic ,后台Websocket收到消息后,将消息发送至Redis中,同时服务端会监听该渠道内的消息,监听到消息后,会将消息推送至对应的客户端。

示例代码

  • application.yml
    主要是Redis配置
server:
  port: 8082

spring:
  thymeleaf:
    #模板的模式,支持 HTML, XML TEXT JAVASCRIPT
    mode: HTML5
    #编码 可不用配置
    encoding: UTF-8
    #内容类别,可不用配置
    content-type: text/html
    #开发配置为false,避免修改模板还要重启服务器
    cache: false
#    #配置模板路径,默认是templates,可以不用配置
    prefix: classpath:/templates
    suffix: .html

  #Redis配置
  redis:
    host: localhost
    port: 6379
    password: 123456
    timeout: 5000
  • RedisSubscriberConfig.java
/**
 * @Description 消息订阅配置类
 * @Author wxl
 * @Date 2020/3/31 13:54
 */
@Configuration
public class RedisSubscriberConfig {
    /**
     * 消息监听适配器,注入接受消息方法
     *
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter messageListenerAdapter(ChatMessageListener receiver) {
        return new MessageListenerAdapter(receiver);
    }
    /**
     * 创建消息监听容器
     *
     * @param redisConnectionFactory
     * @param messageListenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(TOPIC_CUSTOMER));
        return redisMessageListenerContainer;
    }
}
  • RedisUtil.java
@Component
public class RedisUtil {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 发布
     *
     * @param key
     */
    public void publish(String key, String value) {
        stringRedisTemplate.convertAndSend(key, value);
    }
}
  • ChatMessageListener.java
/**
 * @Description 集群聊天消息监听器
 * @Author wxl
 * @Date 2020/3/29 15:07
 */
@Slf4j
@Component
public class ChatMessageListener implements MessageListener {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
        String value = valueSerializer.deserialize(message.getBody());
        ChatMsg dto = null;
        if (StringUtils.isNotBlank(value)) {
            try {
                dto = JacksonUtil.json2pojo(value, ChatMsg.class);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("消息格式转换异常:{}", e.toString());
            }
            log.info("监听集群websocket消息--- {}", value);
            WebSocketServer.sendWebsocketMessage(dto);
        }
    }
}
  • WebSocketServer
@ServerEndpoint("/client/{userName}")
@Component
@Slf4j
public class WebSocketServer {

    /**
     * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
     */
    private static int onlineCount = 0;
    /**
     * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
     */
    private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
    /**
     * 与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    private Session session;

    /**
     * 不能使用@AutoWire原因:发现注入不了redis,redis注入失败 可能是因为实例化的先后顺序吧,WebSocket先实例化了,  但是@Autowire是会触发getBean操作
     * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
     */
    private RedisUtil redisUtil = SpringUtils.getBean(RedisUtil.class);

    /**
     * 接收userId
     */
    private String userName = "";

 

    /**
     * @Description: 连接建立成功调用的方法,成功建立之后,将用户的userName 存储到redis
     * @params: [session, userId]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userName") String userName) {
        this.session = session;
        this.userName = userName;
        webSocketMap.put(userName, this);
        addOnlineCount();
        log.info("用户连接:" + userName + ",当前在线人数为:" + getOnlineCount());
    }

    /**
     * @Description: 连接关闭调用的方法
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnClose
    public void onClose() {
        if (webSocketMap.containsKey(userName)) {
            webSocketMap.remove(userName);
            //从set中删除
            subOnlineCount();
        }
        log.info("用户退出:" + userName + ",当前在线人数为:" + getOnlineCount());
    }


    /**
     * @Description: 收到客户端消息后调用的方法, 调用API接口 发送消息到
     * @params: [message, session]
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:13 PM
     */
    @OnMessage
    public void onMessage(String message, @PathParam("userName") String userName) {
        log.info("用户消息:" + userName + ",报文:" + message);
        if (StringUtils.isNotBlank(message)) {
            try {
                //解析发送的报文
                JSONObject jsonObject = JSON.parseObject(message);
                //追加发送人(防止串改)
                jsonObject.put("sender", this.userName);
                //传送给对应toUserId用户的websocket
                redisUtil.publish(TOPIC_CUSTOMER,jsonObject.toString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    /**
     * 发布websocket消息
     * 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
     *
     * @param dto
     * @return
     */
    public static void sendWebsocketMessage(ChatMsg dto) {
        if (dto != null) {
            if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
                String json = JSON.toJSONString(dto);
                try {
                    webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
                } catch (IOException e) {
                    log.error("消息发送异常:{}", e.toString());
                }
            } else {
                log.error("用户:" + dto.getReceiver() + ",不在次服务器上!");
            }
        }
    }

    /**
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("用户错误:" + this.userName + ",原因:" + error.getMessage());
        error.printStackTrace();
    }

    /**
     * @Description: 获取在线人数
     * @params: []
     * @return: int
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }

    /**
     * @Description: 在线人数+1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void addOnlineCount() {
        WebSocketServer.onlineCount++;
    }

    /**
     * @Description: 在线人数-1
     * @params: []
     * @return: void
     * @Author: wangxianlin
     * @Date: 2020/5/9 9:09 PM
     */
    public static synchronized void subOnlineCount() {
        WebSocketServer.onlineCount--;
    }
}
  • 测试效果


    image.png

方案二 RabbitMq

采用的是基于rabbitmq的扇形分发器,消息生产者发送到指定的队列,消息消费者监听此队列的消息,
将消息推送客户端。

交换机、队列

@Configuration
public class FanoutRabbitConfig {

    /**
     *  创建三个队列 :fanout.msg
     *  将三个队列都绑定在交换机 fanoutExchange 上
     *  因为是扇型交换机, 路由键无需配置,配置也不起作用
     */
    @Bean
    public Queue queueMsg() {
        return new Queue(ConstantUtils.FANOUT_QUEUE_MSG);
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(ConstantUtils.FANOUT_EXCHANGE);
    }

    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueMsg()).to(fanoutExchange());
    }
}

消息监听

@Component
@RabbitListener(queues = ConstantUtils.FANOUT_QUEUE_MSG)
public class FanoutReceiverMsg {
    @RabbitHandler
    public void process(Map msg) throws IOException {
        if (msg !=null){
            WebSocketServer.sendMessage((String)msg.get("receiver"),msg.toString());
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,454评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,553评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,921评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,648评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,770评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,950评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,090评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,817评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,275评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,592评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,724评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,409评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,052评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,815评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,043评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,503评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,627评论 2 350

推荐阅读更多精彩内容