分布式websocket解决方案

前景

我们在做网页聊天的话,基本上都是用到websocket去做。如果单台服务器难以支撑的话,我们就会考虑加机器,做成分布式,做成集群模式。这个时候就会出现关于分布式 session 的问题,也就是说多台fd 连接问题。

单机架构

图片.png

从上面的箭头我们就可以看出,双向是可以通信的。可以来回发送信息的。

分布式架构

图片.png
  • 假如 client1 想要单独发送私聊的信息给 client4,这个时候怎么办?

  • 假如 client1 想要发送群聊给所有的客户端,这个时候怎么办?

  • 假如 后台管理人家,先要发送系统消息广播给所有人。这个时候怎么办?

模拟

nginx.conf

map $http_upgrade $connection_upgrade {
    default upgrade;
    '' close;
}

upstream websocket {
    server 127.0.0.1:9511;
    server 127.0.0.1:9522;
}

server {
    listen 80;
    server_name www.abc.com;
    location / {
        proxy_pass http://websocket;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
    }
}

ws1.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */

$server = new Swoole\WebSocket\Server("0.0.0.0", 9511);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    echo "server: handshake success with fd{$request->fd}\n";
    $server->push($request->fd, "hello, welcome\n");
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    foreach ($server->connections as $fd) {
        // 需要先判断是否是正确的websocket连接,否则有可能会push失败
        if ($server->isEstablished($fd)) {
            $server->push($fd, $frame->data);
        }
    }
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

ws2.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */

$server = new Swoole\WebSocket\Server("0.0.0.0", 9522);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    echo "server: handshake success with fd{$request->fd}\n";
    $server->push($request->fd, "hello, welcome\n");
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
    echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
    foreach ($server->connections as $fd) {
        // 需要先判断是否是正确的websocket连接,否则有可能会push失败
        if ($server->isEstablished($fd)) {
            $server->push($fd, $frame->data);
        }
    }
});

$server->on('close', function ($ser, $fd) {
    echo "client {$fd} closed\n";
});

$server->start();

websocket测试工具

图片.png
图片.png

二个客户端连接,都发送了消息,但是都收不到彼此的消息,我们上面的代码是for循环去push的,肯定是fd不在服务上才会这样的。这个完全就符合我们之前的设想。那么我们如何才能做到相互收到信息呢?现在开始进入我们的主题。

方案

  • Redis

  • MQ (以RabbitMQ为例)

Redis 主要的作用用于存取用户与服务器的关系,MQ 的话主要用于多个服务器的通讯和消息共享问题。

图片.png
图片.png

每个服务端都订阅自己的 queue 进行消费。比如:client1 想要发送消息给 client4, 这个时候的步骤如下:

  • client1 发送消息给 client4,先从 Redis 中获取 client4 的fd与服务器关系。

  • 拿到关系后,直接push到 MQ中,{"queueName":"queue2","toId":"1","fd":"1"...} 让消费者监听处理。

  • 消费者监听某个 queue ,进行消费处理。

代码

ws01.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = "ws01";
$port = 9511;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    $server->push($request->fd, json_encode(["type"=>"open"]));
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {

    $data = $frame->data;
    $data = json_decode($data,true);

    
    switch ($data["type"]) {
        case 'login':
            $redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
            $redis->set($host.':'.$frame->fd,$data["userName"]);
            $server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
            break;
        case 'sendto':
            // 接收者
            $to = $redis->get($data["to"]);
            $to = json_decode($to,true);
            
            // 发送者
            $from = $redis->get($data["from"]);
            $from = json_decode($from,true);

            // 首先自己推送一条
            if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
                $server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 本服务,不同人
            if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
                $server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 不在本服务推到mq
            if ($to["host"] != $host && $to && $from) {
                // 推送rabbitmq
                $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
                $channel = $connection->channel();
                $channel->exchange_declare('messages', 'topic', false, false, false);
                $msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
                $channel->basic_publish($msg, 'messages', $to["host"]);
                $channel->close();
                $connection->close();
                break;
            }
            break;
        case 'message':
            // 发消息,主要用于其他的服务调用,mq的消费者过来的时数据
            if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
                $server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]])); 
            }
            break;
    }
    
});

$server->on('close', function ($ser, $fd) use($redis) {
    $userName = $redis->get($fd);
    $redis->del($userName);
    $redis->del($host.":".$fd);
    echo "client {$fd} closed\n";
});

$server->start();

ws02.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 9:21 上午
 */
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$host = "ws02";
$port = 9512;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);

$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
    $server->push($request->fd, json_encode(["type"=>"open"]));
});

$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {

    $data = $frame->data;
    $data = json_decode($data,true);

    
    switch ($data["type"]) {
        case 'login':
            $redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
            $redis->set($host.':'.$frame->fd,$data["userName"]);
            $server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
            break;
        case 'sendto':
            // 接收者
            $to = $redis->get($data["to"]);
            $to = json_decode($to,true);
            
            // 发送者
            $from = $redis->get($data["from"]);
            $from = json_decode($from,true);

            // 首先自己推送一条
            if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
                $server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 本服务,不同人
            if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
                $server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
            }
            
            // 不在本服务推到mq
            if ($to["host"] != $host && $to && $from) {
                // 推送rabbitmq
                $connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
                $channel = $connection->channel();
                $channel->exchange_declare('messages', 'topic', false, false, false);
                $msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
                $channel->basic_publish($msg, 'messages', $to["host"]);
                $channel->close();
                $connection->close();
                break;
            }
            break;
        case 'message':
            // 发消息,主要用于其他的服务调用,mq的消费者过来的时数据
            if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
                $server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]])); 
            }
            break;
    }
    
});

$server->on('close', function ($ser, $fd) use($redis) {
    $userName = $redis->get($fd);
    $redis->del($userName);
    $redis->del($host.":".$fd);
    echo "client {$fd} closed\n";
});

$server->start();

consumer.php

<?php
/***
 * User: gan
 * Date: 2019/11/2
 * Time: 11:50 上午
 */


require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();

$channel->exchange_declare('messages', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'messages', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
    // $msg->body
    $data  = json_decode( $msg->body,true);
    Co\run(function () use($data) {
        $client = new Swoole\Coroutine\Http\Client($data["host"], $data["port"]);
        $ret = $client->upgrade("/");
        if ($ret) {
            $arr = [
                "type"=>"message",
                "fd"=>$data["fd"],
                "content"=>$data["content"]
            ];
            $client->push(json_encode($arr));
        }
    });
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

index.html

<!DOCTYPE html>
<html>
<head>
    <title></title>
    <meta http-equiv="content-type" content="text/html;charset=utf-8">
    <style>
        p {
            text-align: left;
            padding-left: 20px;
        }
    </style>
</head>
<body>
<div style="width: 600px;height: 600px;margin: 30px auto;text-align: center">
   
    <div style="width: 600px;border: 1px solid gray;height: 300px;">
        <div id="msg_list" style="width:600px;height: 300px;overflow: scroll;float: left;">
        </div>
    </div>
    <br>
    <div style="width: 600px;height: 200px;text-align: left;">
        用户名:<input type="text" name="touser" id="touser" placeholder="请输入发送的用户名">
        <br>
        内容:<textarea id="msg_box" rows="6" cols="50"></textarea><br>
        <input type="button" value="发送" onclick="send()">
    </div>
</div>
</body>
</html>
<script type="text/javascript">
    var uname = prompt('请输入用户名', 'user' + uuid(8, 16));
    var ws = new WebSocket("ws://www.abc.com");
    ws.onopen = function () {
        var data = "系统消息:建立连接成功";
        listMsg(data);
    };

    ws.onmessage = function (e) {
        var msg = JSON.parse(e.data);
        var user_name, name_list, change_type;
        switch (msg.type) {
            case 'system':
                sender = '系统消息: ';
                break;
            case 'open':
                var user_info = {'type': 'login', 'userName': uname};
                sendMsg(user_info);
                return;
        }
        var data = msg.content;
        listMsg(data);
    };

    ws.onerror = function () {
        var data = "系统消息 : 出错了,请退出重试.";
        listMsg(data);
    };


    // 提交发送
    function send() {
        // 内容
        var msg_box = document.getElementById("msg_box");
        var content = msg_box.value;
        // 用户
        var touser = document.getElementById("touser");
        var touser = touser.value;

        var reg = new RegExp("\r\n", "g");
        content = content.replace(reg, "");
        var msg = {'content': content.trim(), 'type': 'sendto','from':uname,'to':touser.trim()};
        sendMsg(msg);
        msg_box.value = '';
    }

    // 消息列表
    function listMsg(data) {
        var msg_list = document.getElementById("msg_list");
        var msg = document.createElement("p");
        msg.innerHTML = data;
        msg_list.appendChild(msg);
        msg_list.scrollTop = msg_list.scrollHeight;
    }

    // 发送消息    
    function sendMsg(msg) {
        var data = JSON.stringify(msg);
        console.log(data)
        ws.send(data);
    }

    // 用户uuid
    function uuid(len, radix) {
        var chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
        var uuid = [], i;
        radix = radix || chars.length;
        if (len) {
            for (i = 0; i < len; i++) uuid[i] = chars[0 | Math.random() * radix];
        } else {
            var r;
            uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-';
            uuid[14] = '4';
            for (i = 0; i < 36; i++) {
                if (!uuid[i]) {
                    r = 0 | Math.random() * 16;
                    uuid[i] = chars[(i == 19) ? (r & 0x3) | 0x8 : r];
                }
            }
        }
        return uuid.join('');
    }
</script>

运行

php ws01.php #启动ws01
php ws02.php #启动ws02
php consumer.php "ws01"  #监听ws1 queue
php consumer.php "ws02"  #监听ws2 queue

效果

客户1

图片.png

客户2

图片.png

客户3

图片.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容