Centrifugo实时通讯,基于go语言编写,提供了websocket 以及sockjs 的兼容处理
安装
npm install centrifuge
import Centrifuge, { Subscription } from 'centrifuge';
/** 创建连接 */
public async connect(url: string, token: string): Promise<CentrifugeGo> {
this.clear();
this.enable = true;
this.url = url;
this.token = token;
this.conn = new Centrifuge(this.url);
this.conn.setToken(this.token);
this.conn.on('disconnect', _ctx => {
console.log('centrifuge disconnect: ', _ctx, this.enable);
if (this.enable) {
this.retryConnect();
}
});
return await this.retryConnect();
}
/** 重连 */
public retryConnect(): Promise<CentrifugeGo> {
return new Promise((resolve, _reject) => {
this.conn.on('connect', (_ctx: CentrifugGoCtxI) => {
this.ctx = _ctx;
this.emit('connect');
this.conn.removeAllListeners('connect');
resolve(this);
});
this.conn.connect();
});
}
/** 断开连接 */
public disconnect() {
this.enable = false;
console.log('centrifuge disconnect() enable:', this.enable);
this.clear();
this.conn?.disconnect();
}
/** 清除订阅 */
private clear() {
this.conn?.removeAllListeners();
while (this.subscriptionList.length) {
this.subscriptionList
.pop()
?.removeAllListeners()
?.unsubscribe();
}
}
/** 订阅 */
// 订阅通道:在需要的地方主动订阅
public subscribe(channel: string) {
this.subscriptionList = [];
const sub = this.conn
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.subscribe(channel, (msg: { event: string; data: any }) => {
// console.log(`subscribe: ${element}:::`, msg.data);
this.emit(msg.data.event, msg.data.data);
})
.on('error', errContext => {
console.log('channel-error: ', errContext);
})
.on('subscribe', context => {
console.log('channel-subscribe: ', context);
})
.on('unsubscribe', context => {
console.log('channel-unsubscribe: ', context);
});
this.subscriptionList.push(sub);
}
// 取消订阅
public unsubscribe(channel: string) {
this.subscriptionList.map((item, index) => {
if (item.channel === channel) {
console.log(item);
item.removeAllListeners();
item.unsubscribe();
this.subscriptionList.splice(index, 1);
}
});
}