最近在梳理一些知识点,已脱敏并去除公司实现,做一些自己理解上的实践。
结构
本次打算模拟下一个实时双工交互的业务实践,先来张图。
可以看出,实时双工通信的基础在于Redis部分,核心就在于Pub/Sub模型,其余部分在此基础上丰富了交互内容。
- Server端 ,用于模拟平时业务机器,对来自客户端的Request给予Response。
-
WebSocket Server端,比如直播业务中在直播间内聊天,肯定要用websocket来维系链接状态,这里可以做到语言无关,既可以用Java,也可以用golang。原理都是类似的。根据双工的特征,websocket服务器与客户端发生信息交互的场景无非主动和被动,场景如下:
- 主动触发, 指的是来自另一个客户端的action,触发了websocket服务器的push行为。
- 被动触发,比如定时器触发,状态检查等行为,都属于被动触发
- APP端,一般团队都会以APP形式落地到终端用户,web网页也是类似。在直播场景中,主播、观众实际上可以统一虚化为客户端。
实现
从图示上来看,每一个模块都不难,一点点去实现就好了。因为不想涉及公司业务上的东西,所以会有一些改动,如下:
- APP端我这里会用JavaScript来作为客户端,简单模拟下就。
- WebSocket服务器端,不打算用公司里Java版本,想试试golang版本。
- Server端就用PHP简单模拟下。
websocket服务器端实现
// server.go
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/garyburd/redigo/redis"
"golang.org/x/net/websocket"
)
var clients map[*websocket.Conn]string = make(map[*websocket.Conn]string)
// websocket 服务器端测试
func Echo(ws *websocket.Conn) {
var err error
if _, ok := clients[ws]; ok != true {
clients[ws] = "匿名"
}
for {
var reply string
if err = websocket.Message.Receive(ws, &reply); err != nil {
fmt.Println("Cannot receive")
break
}
fmt.Println("Current client number: ", len(clients))
fmt.Println("Received back from client: ", reply)
msg := "RECEIVED: " + reply
fmt.Println("Sending to client: " + msg)
for client, _ := range clients {
fmt.Println(client)
if err = websocket.Message.Send(client, msg); err != nil {
fmt.Println("Sending failed")
break
}
}
}
}
func tick() {
for {
time.Sleep(time.Second * 10)
fmt.Println("checking ping")
for key, _ := range clients {
if key.IsClientConn() == false {
// delete(clients, key)
}
// 对所有客户端进行订阅消息的推送
consume(clients)
}
}
}
func consume(clients map[*websocket.Conn]string) {
client, err := redis.Dial("tcp", "localhost:6379")
defer client.Close()
if err != nil {
fmt.Println(err)
}
psc := redis.PubSubConn{Conn: client}
psc.Subscribe("channel")
for {
switch v := psc.Receive().(type) {
case redis.Message:
fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
for client, _ := range clients {
fmt.Println(client)
if err = websocket.Message.Send(client, string(v.Data)); err != nil {
fmt.Println("Sending failed")
break
}
}
case redis.Subscription:
fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
}
}
}
func main() {
http.Handle("/", websocket.Handler(Echo))
go tick()
if err := http.ListenAndServe(":1234", nil); err != nil {
log.Fatal("ListenAndServe failed: ", err)
}
}
app端实现
// client.js
var wsServer = 'ws://localhost:1234';
var websocket = new WebSocket(wsServer);
websocket.onopen = function (evt) {
console.log("Connected to WebSocket server.");
};
websocket.onclose = function (evt) {
console.log("Disconnected");
};
websocket.onmessage = function (evt) {
console.log('Retrieved data from server: ' + evt.data);
};
websocket.onerror = function (evt, e) {
console.log('Error occured: ' + evt.data);
};
// 发送消息
websocket.send("hello world!")
server端实现
<?php
$redis = new Redis();
$redis->pconnect("localhost", 6379);
$ret = $redis->publish("channel", "data from PHP");
var_dump($ret);
测试
按照图例,打算对主动触发和被动触发进行下测试。
主动触发
主动触发其实就是对websocket基本功能的测试,一般都是开俩客户端,一段发消息,看看另一端是否能收到就可以了,流程是
1.启动websocket服务器
→ go run server.go
2.客户端连接websocket服务器
Chrome 打开调试器console输入上面的JavaScript代码即可。
3.交互测试
被动触发
被动触发一般都是定时任务,如定时器timer来触发的。在上面golang代码中,有这么一段调用。
go tick()
// 内部调用了consume方法,来实现对subscribe消息的消费
1.启动websocket服务器
➜ go run server.go
checking ping
2.客户端js链接websocket服务器
3.PHP去publish消息
➜ php publish.php
int(1)
4.查看客户端是否可以收到websocket消费到的数据
整理
至此,基本上双工实时通信的demo就结束了。相比于公司Java实现的版本,大体框架是类似的,无非是有没有业务的支撑。