分布式websocket解决方案

前景

我们在做网页聊天的话,基本上都是用到websocket去做。如果单台服务器难以支撑的话,我们就会考虑加机器,做成分布式,做成集群模式。这个时候就会出现关于分布式 session 的问题,也就是说多台fd 连接问题。

单机架构

图片.png

从上面的箭头我们就可以看出,双向是可以通信的。可以来回发送信息的。

分布式架构

图片.png
  • 假如 client1 想要单独发送私聊的信息给 client4,这个时候怎么办?

  • 假如 client1 想要发送群聊给所有的客户端,这个时候怎么办?

  • 假如 后台管理人家,先要发送系统消息广播给所有人。这个时候怎么办?

模拟

nginx.conf

map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

upstream websocket {
    server 127.0.0.1:9511;
    server 127.0.0.1:9522;
}

server {
    listen 80;
    server_name www.abc.com;
    location / {
        proxy_pass http://websocket;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }
}

ws1.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */

$server = new Swoole\WebSocket\Server("0.0.0.0", 9511);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    echo "server: handshake success with fd{$request->fd}\n";
    $server->push($request->fd, "hello, welcome\n");
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    foreach ($server->connections as $fd) {
        // 需要先判断是否是正确的websocket连接,否则有可能会push失败
        if ($server->isEstablished($fd)) {
            $server->push($fd, $frame->data);
        }
    }
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

ws2.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */

$server = new Swoole\WebSocket\Server("0.0.0.0", 9522);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    echo "server: handshake success with fd{$request->fd}\n";
    $server->push($request->fd, "hello, welcome\n");
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    foreach ($server->connections as $fd) {
        // 需要先判断是否是正确的websocket连接,否则有可能会push失败
        if ($server->isEstablished($fd)) {
            $server->push($fd, $frame->data);
        }
    }
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

websocket测试工具

图片.png
图片.png

二个客户端连接,都发送了消息,但是都收不到彼此的消息,我们上面的代码是for循环去push的,肯定是fd不在服务上才会这样的。这个完全就符合我们之前的设想。那么我们如何才能做到相互收到信息呢?现在开始进入我们的主题。

方案

  • Redis

  • MQ (以RabbitMQ为例)

Redis 主要的作用用于存取用户与服务器的关系,MQ 的话主要用于多个服务器的通讯和消息共享问题。

图片.png
图片.png

每个服务端都订阅自己的 queue 进行消费。比如:client1 想要发送消息给 client4, 这个时候的步骤如下:

  • client1 发送消息给 client4,先从 Redis 中获取 client4 的fd与服务器关系。

  • 拿到关系后,直接push到 MQ中,{"queueName":"queue2","toId":"1","fd":"1"...} 让消费者监听处理。

  • 消费者监听某个 queue ,进行消费处理。

代码

ws01.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = "ws01";
$port = 9511;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    $server->push($request->fd, json_encode(["type"=>"open"]));
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {

    $data = $frame->data;
    $data = json_decode($data,true);

    
    switch ($data["type"]) {
        case 'login':
            $redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
            $redis->set($host.':'.$frame->fd,$data["userName"]);
            $server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
            break;
        case 'sendto':
            // 接收者
            $to = $redis->get($data["to"]);
            $to = json_decode($to,true);
            
            // 发送者
            $from = $redis->get($data["from"]);
            $from = json_decode($from,true);

            // 首先自己推送一条
            if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
                $server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 本服务,不同人
            if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
                $server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 不在本服务推到mq
            if ($to["host"] != $host && $to && $from) {
                // 推送rabbitmq
                $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
                $channel = $connection->channel();
                $channel->exchange_declare('messages', 'topic', false, false, false);
                $msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
                $channel->basic_publish($msg, 'messages', $to["host"]);
                $channel->close();
                $connection->close();
                break;
            }
            break;
        case 'message':
            // 发消息,主要用于其他的服务调用,mq的消费者过来的时数据
            if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
                $server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]])); 
            }
            break;
    }
    
});

$server->on('close', function ($ser, $fd) use($redis) {
    $userName = $redis->get($fd);
    $redis->del($userName);
    $redis->del($host.":".$fd);
    echo "client {$fd} closed\n";
});

$server->start();

ws02.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = "ws02";
$port = 9512;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    $server->push($request->fd, json_encode(["type"=>"open"]));
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {

    $data = $frame->data;
    $data = json_decode($data,true);

    
    switch ($data["type"]) {
        case 'login':
            $redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
            $redis->set($host.':'.$frame->fd,$data["userName"]);
            $server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
            break;
        case 'sendto':
            // 接收者
            $to = $redis->get($data["to"]);
            $to = json_decode($to,true);
            
            // 发送者
            $from = $redis->get($data["from"]);
            $from = json_decode($from,true);

            // 首先自己推送一条
            if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
                $server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 本服务,不同人
            if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
                $server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 不在本服务推到mq
            if ($to["host"] != $host && $to && $from) {
                // 推送rabbitmq
                $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
                $channel = $connection->channel();
                $channel->exchange_declare('messages', 'topic', false, false, false);
                $msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
                $channel->basic_publish($msg, 'messages', $to["host"]);
                $channel->close();
                $connection->close();
                break;
            }
            break;
        case 'message':
            // 发消息,主要用于其他的服务调用,mq的消费者过来的时数据
            if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
                $server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]])); 
            }
            break;
    }
    
});

$server->on('close', function ($ser, $fd) use($redis) {
    $userName = $redis->get($fd);
    $redis->del($userName);
    $redis->del($host.":".$fd);
    echo "client {$fd} closed\n";
});

$server->start();

consumer.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 11:50 上午
 */


require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();

$channel->exchange_declare('messages', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'messages', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
    // $msg->body
    $data  = json_decode( $msg->body,true);
    Co\run(function () use($data) {
        $client = new Swoole\Coroutine\Http\Client($data["host"], $data["port"]);
        $ret = $client->upgrade("/");
        if ($ret) {
            $arr = [
                "type"=>"message",
                "fd"=>$data["fd"],
                "content"=>$data["content"]
            ];
            $client->push(json_encode($arr));
        }
    });
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

index.html

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <meta http-equiv="content-type" content="text/html;charset=utf-8">
    <style>
        p {
            text-align: left;
            padding-left: 20px;
        }
    </style>
</head>
<body>
<div style="width: 600px;height: 600px;margin: 30px auto;text-align: center">
   
    <div style="width: 600px;border: 1px solid gray;height: 300px;">
        <div id="msg_list" style="width:600px;height: 300px;overflow: scroll;float: left;">
        </div>
    </div>
    <br>
    <div style="width: 600px;height: 200px;text-align: left;">
        用户名:<input type="text" name="touser" id="touser" placeholder="请输入发送的用户名">
        <br>
        内容:<textarea id="msg_box" rows="6" cols="50"></textarea><br>
        <input type="button" value="发送" onclick="send()">
    </div>
</div>
</body>
</html>
<script type="text/javascript">
    var uname = prompt('请输入用户名', 'user' + uuid(8, 16));
    var ws = new WebSocket("ws://www.abc.com");
    ws.onopen = function () {
        var data = "系统消息:建立连接成功";
        listMsg(data);
    };

    ws.onmessage = function (e) {
        var msg = JSON.parse(e.data);
        var user_name, name_list, change_type;
        switch (msg.type) {
            case 'system':
                sender = '系统消息: ';
                break;
            case 'open':
                var user_info = {'type': 'login', 'userName': uname};
                sendMsg(user_info);
                return;
        }
        var data = msg.content;
        listMsg(data);
    };

    ws.onerror = function () {
        var data = "系统消息 : 出错了,请退出重试.";
        listMsg(data);
    };


    // 提交发送
    function send() {
        // 内容
        var msg_box = document.getElementById("msg_box");
        var content = msg_box.value;
        // 用户
        var touser = document.getElementById("touser");
        var touser = touser.value;

        var reg = new RegExp("\r\n", "g");
        content = content.replace(reg, "");
        var msg = {'content': content.trim(), 'type': 'sendto','from':uname,'to':touser.trim()};
        sendMsg(msg);
        msg_box.value = '';
    }

    // 消息列表
    function listMsg(data) {
        var msg_list = document.getElementById("msg_list");
        var msg = document.createElement("p");
        msg.innerHTML = data;
        msg_list.appendChild(msg);
        msg_list.scrollTop = msg_list.scrollHeight;
    }

    // 发送消息    
    function sendMsg(msg) {
        var data = JSON.stringify(msg);
        console.log(data)
        ws.send(data);
    }

    // 用户uuid
    function uuid(len, radix) {
        var chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
        var uuid = [], i;
        radix = radix || chars.length;
        if (len) {
            for (i = 0; i < len; i++) uuid[i] = chars[0 | Math.random() * radix];
        } else {
            var r;
            uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-';
            uuid[14] = '4';
            for (i = 0; i < 36; i++) {
                if (!uuid[i]) {
                    r = 0 | Math.random() * 16;
                    uuid[i] = chars[(i == 19) ? (r & 0x3) | 0x8 : r];
                }
            }
        }
        return uuid.join('');
    }
</script>

运行

php ws01.php #启动ws01
php ws02.php #启动ws02
php consumer.php "ws01"  #监听ws1 queue
php consumer.php "ws02"  #监听ws2 queue

效果

客户1

图片.png

客户2

图片.png

客户3

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容