前景
我们在做网页聊天的话,基本上都是用到websocket去做。如果单台服务器难以支撑的话,我们就会考虑加机器,做成分布式,做成集群模式。这个时候就会出现关于分布式 session
的问题,也就是说多台fd
连接问题。
单机架构
从上面的箭头我们就可以看出,双向是可以通信的。可以来回发送信息的。
分布式架构
假如
client1
想要单独发送私聊的信息给client4
,这个时候怎么办?假如
client1
想要发送群聊给所有的客户端,这个时候怎么办?假如 后台管理人家,先要发送系统消息广播给所有人。这个时候怎么办?
模拟
nginx.conf
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
upstream websocket {
server 127.0.0.1:9511;
server 127.0.0.1:9522;
}
server {
listen 80;
server_name www.abc.com;
location / {
proxy_pass http://websocket;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
}
}
ws1.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
$server = new Swoole\WebSocket\Server("0.0.0.0", 9511);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
$server->push($request->fd, "hello, welcome\n");
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
foreach ($server->connections as $fd) {
// 需要先判断是否是正确的websocket连接,否则有可能会push失败
if ($server->isEstablished($fd)) {
$server->push($fd, $frame->data);
}
}
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
ws2.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
$server = new Swoole\WebSocket\Server("0.0.0.0", 9522);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
echo "server: handshake success with fd{$request->fd}\n";
$server->push($request->fd, "hello, welcome\n");
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) {
echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}\n";
foreach ($server->connections as $fd) {
// 需要先判断是否是正确的websocket连接,否则有可能会push失败
if ($server->isEstablished($fd)) {
$server->push($fd, $frame->data);
}
}
});
$server->on('close', function ($ser, $fd) {
echo "client {$fd} closed\n";
});
$server->start();
websocket测试工具
二个客户端连接,都发送了消息,但是都收不到彼此的消息,我们上面的代码是for循环去push的,肯定是fd不在服务上才会这样的。这个完全就符合我们之前的设想。那么我们如何才能做到相互收到信息呢?现在开始进入我们的主题。
方案
Redis
MQ (以RabbitMQ为例)
Redis
主要的作用用于存取用户与服务器的关系,MQ
的话主要用于多个服务器的通讯和消息共享问题。
每个服务端都订阅自己的 queue
进行消费。比如:client1
想要发送消息给 client4
, 这个时候的步骤如下:
client1
发送消息给client4
,先从Redis
中获取client4
的fd与服务器关系。拿到关系后,直接push到 MQ中,
{"queueName":"queue2","toId":"1","fd":"1"...}
让消费者监听处理。消费者监听某个
queue
,进行消费处理。
代码
ws01.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$host = "ws01";
$port = 9511;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
$server->push($request->fd, json_encode(["type"=>"open"]));
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {
$data = $frame->data;
$data = json_decode($data,true);
switch ($data["type"]) {
case 'login':
$redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
$redis->set($host.':'.$frame->fd,$data["userName"]);
$server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
break;
case 'sendto':
// 接收者
$to = $redis->get($data["to"]);
$to = json_decode($to,true);
// 发送者
$from = $redis->get($data["from"]);
$from = json_decode($from,true);
// 首先自己推送一条
if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
$server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 本服务,不同人
if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
$server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 不在本服务推到mq
if ($to["host"] != $host && $to && $from) {
// 推送rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();
$channel->exchange_declare('messages', 'topic', false, false, false);
$msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
$channel->basic_publish($msg, 'messages', $to["host"]);
$channel->close();
$connection->close();
break;
}
break;
case 'message':
// 发消息,主要用于其他的服务调用,mq的消费者过来的时数据
if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
$server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
break;
}
});
$server->on('close', function ($ser, $fd) use($redis) {
$userName = $redis->get($fd);
$redis->del($userName);
$redis->del($host.":".$fd);
echo "client {$fd} closed\n";
});
$server->start();
ws02.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 9:21 上午
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$host = "ws02";
$port = 9512;
$server = new Swoole\WebSocket\Server("0.0.0.0", $port);
$redis = new Redis;
$redis->connect("redis",6379);
$server->on('open', function (Swoole\WebSocket\Server $server, $request) {
$server->push($request->fd, json_encode(["type"=>"open"]));
});
$server->on('message', function (Swoole\WebSocket\Server $server, $frame) use($redis,$host,$port) {
$data = $frame->data;
$data = json_decode($data,true);
switch ($data["type"]) {
case 'login':
$redis->set($data["userName"],json_encode(["fd"=>$frame->fd,"host"=>$host,"port"=>$port,"userName"=>$data["userName"]]),600);
$redis->set($host.':'.$frame->fd,$data["userName"]);
$server->push($frame->fd, json_encode(["type"=>"message","content"=>"hello, welcome ".$data['userName']]));
break;
case 'sendto':
// 接收者
$to = $redis->get($data["to"]);
$to = json_decode($to,true);
// 发送者
$from = $redis->get($data["from"]);
$from = json_decode($from,true);
// 首先自己推送一条
if ($server->exist($frame->fd) && $server->isEstablished($frame->fd)) {
$server->push($frame->fd, json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 本服务,不同人
if($data["from"] != $data["to"] && $server->exist($to["fd"]) && $server->isEstablished($to["fd"])){
$server->push($to["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
// 不在本服务推到mq
if ($to["host"] != $host && $to && $from) {
// 推送rabbitmq
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();
$channel->exchange_declare('messages', 'topic', false, false, false);
$msg = new AMQPMessage(json_encode(["to"=>$to["userName"],"content"=> $data["content"],"fd"=>$to["fd"],"host"=>$to["host"],"port"=>$to["port"]]));
$channel->basic_publish($msg, 'messages', $to["host"]);
$channel->close();
$connection->close();
break;
}
break;
case 'message':
// 发消息,主要用于其他的服务调用,mq的消费者过来的时数据
if ($server->exist($data["fd"]) && $server->isEstablished($data["fd"])) {
$server->push($data["fd"], json_encode(["type"=>"message","content"=>$data["content"]]));
}
break;
}
});
$server->on('close', function ($ser, $fd) use($redis) {
$userName = $redis->get($fd);
$redis->del($userName);
$redis->del($host.":".$fd);
echo "client {$fd} closed\n";
});
$server->start();
consumer.php
<?php
/***
* User: gan
* Date: 2019/11/2
* Time: 11:50 上午
*/
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin@1024');
$channel = $connection->channel();
$channel->exchange_declare('messages', 'topic', false, false, false);
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
exit(1);
}
foreach ($binding_keys as $binding_key) {
$channel->queue_bind($queue_name, 'messages', $binding_key);
}
echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
// $msg->body
$data = json_decode( $msg->body,true);
Co\run(function () use($data) {
$client = new Swoole\Coroutine\Http\Client($data["host"], $data["port"]);
$ret = $client->upgrade("/");
if ($ret) {
$arr = [
"type"=>"message",
"fd"=>$data["fd"],
"content"=>$data["content"]
];
$client->push(json_encode($arr));
}
});
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
index.html
<!DOCTYPE html>
<html>
<head>
<title></title>
<meta http-equiv="content-type" content="text/html;charset=utf-8">
<style>
p {
text-align: left;
padding-left: 20px;
}
</style>
</head>
<body>
<div style="width: 600px;height: 600px;margin: 30px auto;text-align: center">
<div style="width: 600px;border: 1px solid gray;height: 300px;">
<div id="msg_list" style="width:600px;height: 300px;overflow: scroll;float: left;">
</div>
</div>
<br>
<div style="width: 600px;height: 200px;text-align: left;">
用户名:<input type="text" name="touser" id="touser" placeholder="请输入发送的用户名">
<br>
内容:<textarea id="msg_box" rows="6" cols="50"></textarea><br>
<input type="button" value="发送" onclick="send()">
</div>
</div>
</body>
</html>
<script type="text/javascript">
var uname = prompt('请输入用户名', 'user' + uuid(8, 16));
var ws = new WebSocket("ws://www.abc.com");
ws.onopen = function () {
var data = "系统消息:建立连接成功";
listMsg(data);
};
ws.onmessage = function (e) {
var msg = JSON.parse(e.data);
var user_name, name_list, change_type;
switch (msg.type) {
case 'system':
sender = '系统消息: ';
break;
case 'open':
var user_info = {'type': 'login', 'userName': uname};
sendMsg(user_info);
return;
}
var data = msg.content;
listMsg(data);
};
ws.onerror = function () {
var data = "系统消息 : 出错了,请退出重试.";
listMsg(data);
};
// 提交发送
function send() {
// 内容
var msg_box = document.getElementById("msg_box");
var content = msg_box.value;
// 用户
var touser = document.getElementById("touser");
var touser = touser.value;
var reg = new RegExp("\r\n", "g");
content = content.replace(reg, "");
var msg = {'content': content.trim(), 'type': 'sendto','from':uname,'to':touser.trim()};
sendMsg(msg);
msg_box.value = '';
}
// 消息列表
function listMsg(data) {
var msg_list = document.getElementById("msg_list");
var msg = document.createElement("p");
msg.innerHTML = data;
msg_list.appendChild(msg);
msg_list.scrollTop = msg_list.scrollHeight;
}
// 发送消息
function sendMsg(msg) {
var data = JSON.stringify(msg);
console.log(data)
ws.send(data);
}
// 用户uuid
function uuid(len, radix) {
var chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'.split('');
var uuid = [], i;
radix = radix || chars.length;
if (len) {
for (i = 0; i < len; i++) uuid[i] = chars[0 | Math.random() * radix];
} else {
var r;
uuid[8] = uuid[13] = uuid[18] = uuid[23] = '-';
uuid[14] = '4';
for (i = 0; i < 36; i++) {
if (!uuid[i]) {
r = 0 | Math.random() * 16;
uuid[i] = chars[(i == 19) ? (r & 0x3) | 0x8 : r];
}
}
}
return uuid.join('');
}
</script>
运行
php ws01.php #启动ws01
php ws02.php #启动ws02
php consumer.php "ws01" #监听ws1 queue
php consumer.php "ws02" #监听ws2 queue