说说Egg.js中的多进程增强模型(一)

Egg.js是阿里推出的面向Node的企业级服务框架,这里只是讲一讲egg进阶中的插件开发会遇到多进程增强模型.


背景

Egg.js原理简介
稍微熟悉Egg.js原理的应该都知道master / agent / worker这三个进程的职责以及agent.js / app.js 这两个js文件,agent进程对应于agent.js,worker进程对应的是app.js,而worker进程是有多个的以集群方式进行工作的,并且最终部署的应用也是集群的方式部署在不同的机器上的,因此实际的worker是一个n x m的数量。

服务长链
服务端应用最典型的就是数据库连接(如: MySQL),尤其是微服务化后出现了各种各样的中间件(如:Eureka/Zookeeper/Disconf), 这样每一个应用都需要维护各种各样的长链接。

Egg的支持
对于长链接的创建方式,Egg提供了两种支持分别是:app.addSingleton(name, creator)多进程增强模型。两种方式分别在什么时候使用? addSingleton的方式可以直接参考Egg提供的例子MySQL,它可以保证一个application的对象(一个worker)只会有一个mysql实例,但是多个worker还是会有多个,对于MySQL这种在server端有链接池的是没有问题的,而且这样实现也简单易用,但是如果没有链接池的中间件来讲这样是一种极大的资源浪费 (n x m), 因此就会用到了多进程增强模型,下面具体说说。

多进程增强模型

Egg中的多进程增强模型实际上完全使用的就是Cluser-Client库(也是阿里开源),在GitHub上面有它工作原理和使用方式的介绍,只是不知道大家会不会和我一样看了一遍之后依然不知所云和无从下手的感觉,因此才写了这篇博客将源码阅读的理解记录下来。

Egg文档引用

首先创建RegistryClient代码如下:

const Base = require('sdk-base');
class RegistryClient extends Base {
...
}

然后创建一个APIClient类继承框架提供的快捷类APIClientBase, 代码如下:

const APIClientBase = require('cluster-client').APIClientBase;
const RegistryClient = require('./registry_client');

class APIClient extends APIClientBase {
  // 返回原始的客户端类
  get DataClient() {
    return RegistryClient;
  }

  subscribe(reg, listener) {
    this._client.subscribe(reg, listener);
  }

  publish(reg) {
    this._client.publish(reg);
  }
}

这里需要注意的是:

  1. DataClient方法需要返回前面定义好的RegistryClient类。
  2. _client属性是继承自父类, 直接就可以使用。

在Egg中嵌入上面的代码:

// app.js || agent.js
const APIClient = require('./APIClient'); // 上面那个模块
module.exports = app => {
  const config = app.config.apiClient;
  app.apiClient = new APIClient(Object.assign({}, config, { cluster: app.cluster });
  app.beforeStart(async () => {
    await app.apiClient.ready();
  });
};

根据上面的代码进行下面的梳理:

  1. 每一个agent / application都会有一个 APIClient实例。
  2. 所有的APIClient实例都会知晓RegistryClient类名。
  3. APIClient里面的方法会实际的调用一个_client属性。

理解:
这就是一个静态代理模式,所有想要对RegistryClient类的调用都要经过APIClient进行一次代理,所以只要保证RegistryClient的实例只有一个,其它所有的APIClient都可以通过某种方式将操作(请求)传达给RegistryClient就可以实现多进程单实例模式了。

注:上面使用方式是cluster-client的最佳实践,虽然抛开APIClient这个类也可以,这里直接跳过了是因为这样拆解更灵活并易于扩展,实际这里是需要进行两层的代理, RegistryClient会代理真正的业务client的调用(可以动态代理实现)并维护业务client的链接和事件接收,APIClient是用来mock所有业务client的api,让业务的使用更贴近真正业务client的调用。如(示意):APIClient.getData() --> RegistryClient.<DynamicDispatcher> --> zkClient.getData()

源码分析

有了上面的例子和思路,带着下面两个问题进行源码的分析:

  1. 如何保证RegistryClient的实例只有一个。
  2. APIClient类是如何和真实的client类进行交互的。

主从模式(Leader / Follower)
将多进程分为主(Leader)进程和从(Follower)进程,Leader只有一个并负责维护实际的第三方应用的链接及事件处理,Follower用于订阅Leader的一些事件及主动推送数据给Leader,也可以主动调用Leader执行一些操作,它们之间可以通过进程间通信的方式进行信息交换。在Egg中规定了agent进程是Leader,而其他worker进程作为Follower,代码如下isLeader: this.type === 'agent'

// node_modules/egg/egg.js
class EggApplication extends EggCore {
  constructor(options) {
    ...
    ...
    /**
     * Wrap the Client with Leader/Follower Pattern
     *
     * @description almost the same as Agent.cluster API, the only different is that this method create Follower.
     *
     * @see https://github.com/node-modules/cluster-client
     * @param {Function} clientClass - client class function
     * @param {Object} [options]
     *   - {Boolean} [autoGenerate] - whether generate delegate rule automatically, default is true
     *   - {Function} [formatKey] - a method to tranform the subscription info into a string,default is JSON.stringify
     *   - {Object} [transcode|JSON.stringify/parse]
     *     - {Function} encode - custom serialize method
     *     - {Function} decode - custom deserialize method
     *   - {Boolean} [isBroadcast] - whether broadcast subscrption result to all followers or just one, default is true
     *   - {Number} [responseTimeout] - response timeout, default is 3 seconds
     *   - {Number} [maxWaitTime|30000] - leader startup max time, default is 30 seconds
     * @return {ClientWrapper} wrapper
     */
    this.cluster = (clientClass, options) => {
      options = Object.assign({}, this.config.clusterClient, options, {
        // cluster need a port that can't conflict on the environment
        port: this.options.clusterPort,
        // agent worker is leader, app workers are follower
        isLeader: this.type === 'agent',
        logger: this.coreLogger,
      });
      const client = cluster(clientClass, options);
      this._patchClusterClient(client);
      return client;
    };
    ...
    ...
  }
  ...
  ...
}

上面👆代码是在agent 和 application对象上挂了一个名为cluser的创建方法,方法返回一个ClientWrapper实例。

Cluster-Client代码结构

|--cluster-client
  |--lib
    |--protocol
      --byte_buffer.js
      --packet.js
      --request.js
      --response.js
    --api_client.js
    --client.js
    --connections.js
    --const.js
    --default_logger.js
    --default_transcode.js
    --follower.js
    --index.js
    --leader.js
    --server.js
    --symbol.js
    --utils.js

这里我们先重点关注api_client.js / index.js / client.js这三个源码。回想到上面Egg文档给我提供的创建apiClient的例代码👇 :

new APIClient(Object.assign({}, config, { cluster: app.cluster });

我们就来到了cluster-client/lib/api_client.js, 这里将app.cluster方法传入,参考源码:

  1   constructor(options) {
  2     options = options || {};
  3     super(options);

  4     const wrapper = (options.cluster || cluster)(
  5       this.DataClient, this.clusterOptions
  6     );
  7     for (const from in this.delegates) {
  8       const to = this.delegates[from];
  9       wrapper.delegate(from, to);
  10    }
  11    this._client = wrapper.create(options);
  12    utils.delegateEvents(this._client, this);

  13    if (!options.initMethod) {
  14     this._client.ready(err => {
  15      this.ready(err ? err : true);
  16    });
  17   }
  18 }

第4行代码直接就调用了cluster方法创建了一个ClientWrapper实例,第11行调用了wrapper的create方法,这样我们就来到了cluster-client/lib/index.js:

// 去掉不分析的代码
...
create (...args) {
    ...
    function createRealClient() {
      return Reflect.construct(clientClass, args);
    }

    const client = new ClusterClient(Object.assign({
      createRealClient,
      descriptors: this._descriptors,
    }, this._options));
    ...
}
...

create方法主要是做了一些方法delegate生成和方法校验(下回分析),这里调用了包装了一个反射创建真实RegistryClient实例的方法并传入ClusterClient生成了一个实例最终返回给调用者其实就是APIClient中的_client,那么这样就来到了重点的cluster-client/lib/client.js, 方便查看这里直接就贴出[init]部分代码:

  async [init]() {
    const name = this.options.name;
    const port = this.options.port;
    let server;
    if (this.options.isLeader === true) {
      server = await ClusterServer.create(name, port);
      if (!server) {
        throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
      }
    } else if (this.options.isLeader === false) {
      // wait for leader active
      await ClusterServer.waitFor(port, this.options.maxWaitTime);
    } else {
      debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
      server = await ClusterServer.create(name, port);
    }

    if (server) {
      this[innerClient] = new Leader(Object.assign({ server }, this.options));
      debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
    } else {
      this[innerClient] = new Follower(this.options);
      debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
    }
...
}

代码非常清晰,如果是leader就会创建一个server并监听<port>, 如果是follower就链接server的<port>端口(可以查看server.js代码)。然后分别new了Leader和Follower两个实例并赋值给[innerClient]。当我们再查看Leader.js 的代码时,发现在构造函数里有 this._realClient = this.options.createRealClient();, 原来真正的client是在这个时间创建的,而查看Follower.js的代码时发现都是发送的tcp请求。这样上面的两个问题我们就都有了答案。

  1. agent进程起来后加载agent.js的时候设置了cluster方法,在beforeStart时通过new APIClient初始化_client属性的同时启动了一个tcp server并在new Leader对象时初始化正在的client。Egg的agent进程只有一个因此真正的client实例也只有一个。
  2. 当调用APIClient的方法时就会通过_client属性调用到ClusterClient,然后再调用它内部[innerClient], 而[innerClient]分别是LeaderFollower的实例,所以如果是leader就直接调用realClient否则就发送tcp请求。

至此cluster-client的多进程增强模式的主从原理就分析完成了,在实际的实现过程中具体的调用还是有一些规则和约束,如:delegates的设置以及subscribe / publish / invoke / invokeOneway分别是如何使用的还需要进一步了解。

下一篇:说说Egg.js中的多进程增强模型(二)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 207,248评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,681评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,443评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,475评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,458评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,185评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,451评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,112评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,609评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,083评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,163评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,803评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,357评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,357评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,590评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,636评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,925评论 2 344

推荐阅读更多精彩内容