owt-server 的集群管理者、集群工作站、消息队列(二)





上一篇《owt-server 的集群管理者、集群工作站、消息队列(一)》主要介绍了 owt-server 的集群管理者(clusterManager)的相关定义,同时提到了消息交互的底层是依赖于消息队列的。owt-server中使用了node.js 的 amqp 库去连接rabbitmq实例,源码中对 amqp 操作进一步封装形成适合其他模块调用的消息队列接口。

本篇将对上述的amqp封装模块 amqp_client.js进行分解介绍,但是在开始模块分解之前,简要介绍一下amqp这种协议(amqp其实是一种协议,定义了一种消息队列的组织模块、基本概念)。了解 amqp 协议有助于初学者快速理解消息队列的原理,在走读 owt-server 以及其他使用消息队列的源码中会轻松很多)

在本篇之后,你将对owt-server的集群管理(clusterManager)、底层消息机制(rpc、消息队列)有了一定了解。那么在《owt-server 的集群管理者、集群工作站、消息队列(三)
》中,就将开始介绍集群工作者(clusterWorker),clusterWorkers 将同样利用本篇所述的rpc机制与clusterManagers(准确的说是master)进行交互,owt-server系统的全貌也将由此逐渐展现。

2、amqp(Advanced Message Queuing Protocol)协议简要介绍

如果你已经了解 “高级消息队列协议”,缩写是amqp,直接跳过本节。

如果你需要深入了解amqp协议、学习一些栗子,可以参阅 rabbitmq 的官方资料 或者 其他优秀的博客,比如:

   rabbitmq 对于AMQP的介绍: [https://www.rabbitmq.com/tutorials/amqp-concepts.html#message-acknowledge](https://www.rabbitmq.com/tutorials/amqp-concepts.html#message-acknowledge)


下图中包含了 amqp 的几个元素:publisher、exchange、routes、queue、consumer

publisher: 消息的发布者

consumer: 消息的消费者

exchange: 发布的消息将通过关联的routes被路由到指定queue

routes: 路由规则

queue: 存储消息的队列



Direct exchange 直连式

Fanout exchangea 广播式

Topic exchange 主题式

Headers exchange 头匹配式




3、owt-server 中 amqp 封装模块分解

amqp_client.js 位于 owt-server源码目录下的 **source/common/ **文件夹中

root@ubuntu:/home/wonder/OWT/owt-server-master/source/common# ls

amqp_client.js cipher.js clusterWorker.js loadCollector.js logger.js makeRPC.js mediaUtil.js rpcChannel.js

查看amqp_client.js 文件,有以下定义

3.1 var declareExchange 定义

var declareExchange = function(conn, name, type, autoDelete, on_ok, on_failure) {    //创建exchange元素,conn为rabittmq的连接实例

    var ok = false;

    var exc = conn.exchange(name, {type: type, autoDelete: autoDelete}, function (exchange) {    //用rabbitmq连接实例创建指定名称、类型的exchange

        log.debug('Exchange ' + exchange.name + ' is open');

        ok = true;

        on_ok(exc);    //创建成功执行回调函数,把exchange元素传回调用者



3.2 var rpcClient 定义


var rpcClient = function(bus, conn, on_ready, on_failure) {    //bus为外部透传参数,conn为rabbitmq实例

    var handler = {bus: bus};

    var call_map = {},

        corrID = 0,

        ready = false,



    var queueBindingName;

    declareExchange(conn, 'owtRpc', 'direct', true, function (exc_got) {    //生成一个 direct 类型的 exchange对象

        exc = exc_got;    //生成的exchange对象

        reply_q = conn.queue( ' ', function (q) {    //创建一个响应消息队列,用于接收rpc消息的响应(简称响应)

            log.debug('Reply queue for rpc client ' + q.name + ' is open');

            // Save queueBindingName once.

            queueBindingName = q.name;    //保留该响应队列的名称

            reply_q.bind(exc.name, queueBindingName, function () {    //响应队列绑定exchange和route名

                reply_q.subscribe(function (message) {    //响应队列订阅开启(开始接收消息)

                    try {

                        log.debug('New message received', message);

                        if(call_map[message.corrID] !== undefined) {    //接收到的响应是有rpc发起者的id记录在案的

                            log.debug('Callback', message.type, ' - ', message.data);

                            clearTimeout(call_map[message.corrID].timer);    //清除该发起者id定时器

                            call_map[message.corrID].fn[message.type].call({}, message.data, message.err);    //使用指定处理函数处理该响应

                            if (call_map[message.corrID].fn['onStatus']) {    //状态响应需要一起时间生效

                                setTimeout(function() {    //一定时间后,销毁对应rpc消息记录

                                    (call_map[message.corrID] !== undefined) &&  (delete call_map[message.corrID]);

                                }, REMOVAL_TIMEOUT);

                            } else {    //销毁对应rpc消息记录

                                (call_map[message.corrID] !== undefined) && (delete call_map[message.corrID]);


                        } else {

                          log.warn('Late rpc reply:', message);


                    } catch(err) {

                        log.error('Error processing response: ', err);



                ready = true;    //响应队列绑定成功

                on_ready();    //响应队列绑定成功的回调



    }, on_failure);

    handler.remoteCall = function(to, method, args, callbacks, timeout) {    //执行一次远程调用(rpc),to为该rpc对应的消息队列名称,method为rpc方法名,args为rpc方法的参数,callbacks为rpc响应的处理函数,timeout为rpc超时时间

        log.debug('remoteCall, corrID:', corrID, 'to:', to, 'method:', method);

        if (ready) {    //响应队列可用

            var corr_id = corrID++;    //消息序号

            call_map[corr_id] = {};

            call_map[corr_id].fn = callbacks || {callback: function() {}};    //记录rpc响应的处理函数

            call_map[corr_id].timer = setTimeout(function() {    //设置rpc超时时钟

                log.debug('remoteCall timeout, corrID:', corr_id);

                if (call_map[corr_id]) {

                    for (var i in call_map[corr_id].fn) {

                        (typeof call_map[corr_id].fn[i] === 'function' ) && call_map[corr_id].fn[i]('timeout');    //所有rpc响应的处理函数置为超时


                    delete call_map[corr_id];    //清理rpc记录


            }, timeout || TIMEOUT);

            exc.publish(to, {method: method, args: args, corrID: corr_id, replyTo: queueBindingName});    //发布rpc消息

        } else {    //响应队列未就绪

            for (var i in callbacks) {

                (typeof callbacks[i] === 'function' ) && callbacks[i]('error', 'rpc client is not ready');    //响应的处理函数置为错误




    handler.remoteCast = function(to, method, args) {    //向远端投递消息,与rpc的区别是,投递消息不需要响应

        exc && exc.publish(to, {method: method, args: args});    //向exchange发布


    handler.close = function() {    //关闭该rpc client

        for (var i in call_map) {    //清除所有rpc记录的超时



        call_map = {};

        reply_q && reply_q.destroy();    

        reply_q = undefined;

        exc && exc.destroy(true);

        exc = undefined;


    return handler;


3.3 var rpcClient 定义


var rpcServer = function(bus, conn, id, methods, on_ready, on_failure) {    //bus为外部透传参数,conn为rabbitmq实例,id为该rpc服务名,methodes为rpc方法表

    var handler = {bus: bus};

    var exc, request_q;

    declareExchange(conn, 'owtRpc', 'direct', true, function (exc_got) {    //创建 “direct” 类型的exchange

        exc = exc_got;

        var ready = false;

        request_q = conn.queue(id, function (queueCreated) {    //创建rpc请求接收队列,简称“请求队列”

            log.debug('Request queue for rpc server ' + queueCreated.name + ' is open');

            request_q.bind(exc.name, id, function() {    //请求队列绑定exchange名称和route名

                request_q.subscribe(function (message) {    //请求开启订阅(开始接收消息)

                    try {

                        log.debug('New message received', message);

                        message.args = message.args || [];

                        if (message.replyTo && message.corrID !== undefined) {    //响应目标和消息序号正常

                            message.args.push(function(type, result, err) {    //在请求消息参数中加入响应发送回调函数

                                exc.publish(message.replyTo, {data: result, corrID: message.corrID, type: type, err: err});



                        if (typeof methods[message.method] === 'function') {    //rpc请求的方法存在且是函数

                            methods[message.method].apply(methods, message.args);    //调用该函数

                        } else {    //rcp请求该方法不存在

                            log.warn('RPC server does not support this method:', message.method);

                            if (message.replyTo && message.corrID !== undefined) {    //响应消息置为错误

                                exc.publish(message.replyTo, {data: 'error', corrID: message.corrID, type: 'callback', err: 'Not support method'});    //发送响应



                    } catch (error) {

                        log.error('message:', message);

                        log.error('Error processing call: ', error);



                ready = true;




    }, on_failure);

    handler.close = function() {    //rpc server关闭

        request_q && request_q.destroy();

        request_q = undefined;

        exc && exc.destroy(true);

        exc = undefined;


    return handler;


3.4 var topicParticipant 定义


var topicParticipant = function(bus, conn, excName, on_ready, on_failure) {    //bus为外部透传参数,conn为rabbitmq实例,excName为主题名称

    var that = {bus: bus},


    var exc, msg_q;

    declareExchange(conn, excName, 'topic', false, function (exc_got) {    //创建“topic” 类型exchange

        exc = exc_got;

        var ready = false;

        msg_q = conn.queue('', function (queueCreated) {    //创建消息队列

            log.debug('Message queue for topic participant is open:', queueCreated.name);

            ready = true;



    }, on_failure);

    that.subscribe = function(patterns, on_message, on_ok) {    //订阅 patterns 中的主题,on_message为消息回调函数

        if (msg_q) {

            message_processor = on_message;

            patterns.map(function(pattern) {    //消息队列绑定patterns中的每个主题

                msg_q.bind(exc.name, pattern, function() {    

                    log.debug('Follow topic [' + pattern + '] ok.');



            msg_q.subscribe(function(message) {    //消息队列订阅开启(开始接收消息)

                try {

                    message_processor && message_processor(message);    //回调消息

                } catch (error) {

                    log.error('Error processing topic message:', message, 'and error:', error);






    that.unsubscribe = function(patterns) {    //取消订阅patterns中的主题

        if (msg_q) {

            message_processor = undefined;

            patterns.map(function(pattern) {    //对patterns中的每条主题取消订阅

                msg_q.unbind(exc.name, pattern);

                log.debug('Ignore topic [' + pattern + ']');




    that.publish = function(topic, data) {

        exc && exc.publish(topic, data);    //在一个主题上发布一条消息


    that.close = function () {


        msg_q = undefined;

        exc && exc.destroy(true);

        exc = undefined;


    return that;


3.4 var faultMonitor定义


var faultMonitor = function(bus, conn, on_message, on_ready, on_failure) {    //bus为外部透传参数,conn为rabbitmq实例,on_message为消息处理(回调)函数

    var that = {bus: bus},

          msg_receiver = on_message;

    var exc, msg_q;

    declareExchange(conn, 'owtMonitoring', 'topic', false, function (exc_got) {    //创建“topic”类型的exchange

        exc = exc_got;

        var ready = false;

        msg_q = conn.queue('', function (queueCreated) {    //创建消息队列

            log.debug('Message queue for monitoring is open:', queueCreated.name);

            ready = true;

            msg_q.bind(exc.name, 'exit.#', function() {    //绑定消息队列到“exit.#”主题

                log.debug('Monitoring queue bind ok.');


            msg_q.subscribe(function(msg) {    //消息队列定义开启

                try {

                    log.debug('received monitoring message:', msg);

                    msg_receiver && msg_receiver(msg);    //处理消息

                } catch (error) {

                    log.error('Error processing monitored message:', msg, 'and error:', error);





    }, on_failure);

    that.setMsgReceiver = function (on_message) {    //重设置消息处理函数

        msg_receiver = on_message;


    that.close = function () {


        msg_q = undefined;

        exc && exc.destroy(true);

        exc = undefined;


    return that;


3.5 var monitoringTarget定义


var monitoringTarget = function(bus, conn, on_ready, on_failure) {    //bus为外部透传参数,conn为rabbitmq实例

    var that = {bus: bus};

    var exc;

    declareExchange(conn, 'owtMonitoring', 'topic', false, function (exc_got) {    //创建“topic”类型exchange

        exc = exc_got;


    }, on_failure);

    that.notify = function(reason, message) {    //发送主题为“exit.”+resason的通告

        exc && exc.publish('exit.' + reason, {reason: reason, message: message});


    that.close = function () {


        exc = undefined;


    return that;


3.6 amqp_client.js的模块导出定义


module.exports = function() {



    that.connect = function(options, on_ok, on_failure) {

        log.debug('Connecting to rabbitMQ server:', options);

        var setupConnection = function(options) {    //建立ampq到rabbitmq的连接实例



             * `amqp.createConnection([options, [implOptions]])` takes two options

             * objects as parameters.  The first options object has these defaults:


             *     { host: 'localhost'

             *     , port: 5672

             *     , login: 'guest'

             *     , password: 'guest'

             *     , connectionTimeout: 10000

             *     , authMechanism: 'AMQPLAIN'

             *     , vhost: '/'

             *     , noDelay: true

             *     , ssl: { enabled : false

             *            }

             *     }


             * The second options are specific to the node AMQP implementation. It has

             * the default values:


             *     { defaultExchangeName: ''

             *     , reconnect: true

             *     , reconnectBackoffStrategy: 'linear'

             *     , reconnectExponentialLimit: 120000

             *     , reconnectBackoffTime: 1000

             *     }


            // Note that we use the default option.

            // So the reconnect is enabled, and reconnect strategy is

            // 'linear', which means the broken connection will try to

            // recover after every 'reconnectBackoffTime' which is

            // 1000ms by default.

            var conn = amqp.createConnection(options);    //创建连接,异步进行

            var connected = false;

            conn.on('ready', function() {    //设置连接状态“ready”的处理函数

                delete options.password;    //删除配置中rabbitmq的密码,保证程序运行期间密码不存于内存

                log.info('Connecting to rabbitMQ server OK, options:', options);

                connection = conn;

                // The 'ready' event will be triggered each time

                // the connection is OK. So just invoke the success

                // callback once to avoid the duplicate logic when

                // reconnecting is done.

                if (!connected) {

                    connected = true;




            conn.on('error', function(e) {    //设置连接状态“error”的处理函数

                // The amqp client will try to reconnect by

                // the default option, so just notify something here.

                log.info('Connection to rabbitMQ server error', e);


            conn.on('disconnect', function() {     //设置连接状态“disconnect”的处理函数

                if (connection) {


                    connection = undefined;

                    log.info('Connection to rabbitMQ server disconnected.');

                    on_failure('amqp connection disconnected');




        if (fs.existsSync(cipher.astore)) {    //获取授权信息文件

            cipher.unlock(cipher.k, cipher.astore, function cb (err, authConfig) {    //使用cipher库解密出cipher.astore文件中存储的关于rabbit的授权信息(用户名和密码),cipher库实现请参考源码

                if (!err) {

                    if (authConfig.rabbit) {

                        options.login = authConfig.rabbit.username;

                        options.password = authConfig.rabbit.password;


                    setupConnection(options);    //建立和rabbit的连接

                } else {

                    log.error('Failed to get rabbitmq auth:', err);




        } else {







同时,源码中也出现了一些有趣的点,比如 密码如何不在运行期暴露、不在存储介质上暴露。具体而言,在成功连接rabbit后,立即删除进程中存储rabbit登录密码的变量;存储介质上存储rabbit授权信息的文件是经过加密、压缩的,仅在使用时解压、解密;并且在加解密文件的秘钥设置上(见源码目录source/common/cipher.js)owt-server的注释是“Replace k with your key generator”。

