async源码之series

前言

最近在看Node设计模式之异步编程的顺序异步迭代,简单的实现如下:

function series(tasks, callback) {
    let results = [];
    function iterate(index) {
        if (index === tasks.length) {
            return finish();
        }
        const task = tasks[index];
        task(function(err, res) {
            results.push(res);
            iterate(index + 1);
        });
    }

    function finish() {
        // 迭代完成的操作
        callback(null, results);
    }

    iterate(0);
}

series(
    [
        callback => {
            setTimeout(function() {
                console.log(456);
                callback(null, 1);
            }, 500);
        },
        callback => {
            console.log(123);
            callback(null, 2);
        }
    ],
    function(err, results) {
        console.log(results);
    }
);

// 456
// 123
// [1, 2]

而async库是一个非常流行的解决方案,在Node.js和JavaScript中来说,用于处理异步代码。它提供了一组功能,可以大大简化不同配置中一组任务的执行,并为异步处理集合提供了有用的帮助。

async库可以在实现复杂的异步控制流程时大大帮助我们,但是一个难题就是选择正确的库来解决问题。例如,对于顺序执行,有大约20个不同的函数可供选择。

好奇心起来,就想看看一个成熟的库跟我们简单实现的代码区别有多大。

series

按顺序运行任务集合中的函数,每个函数在前一个函数完成后运行。如果系列中的任何函数将错误传递给其回调函数,则不会运行更多函数,并立即使用错误值调用回调函数。否则,回调会在任务完成时收到一系列结果。

const async = require('async');

async.series({
    one: function(callback) {
        setTimeout(function() {
            callback(null, 1);
        }, 200);
    },
    two: function(callback){
        setTimeout(function() {
            callback(null, 2);
        }, 100);
    }
}, function(err, results) {
    console.log(results);
    // results is now equal to: {one: 1, two: 2}
});

我们来看看源码,找到series方法,可以看到:

function series(tasks, callback) {
    _parallel(eachOfSeries, tasks, callback);
}

除了我们自己传的两个参数以外,默认还传了一个eachOfSeries,接着往下看:

function _parallel(eachfn, tasks, callback) {
    // noop:空的函数
    callback = callback || noop;
    // isArrayLike:检查'value'是否与array相似
    var results = isArrayLike(tasks) ? [] : {};

    eachfn(tasks, function (task, key, callback) {
        // wrapAsync:包装成异步
        wrapAsync(task)(function (err, result) {
            if (arguments.length > 2) {
                result = slice(arguments, 1);
            }
            results[key] = result;
            callback(err);
        });
    }, function (err) {
        callback(err, results);
    });
}

这里我们可以看到,_parallel方法其实就是eachOfSeries方法的调用。

先解释一下eachOfSeries这三个参数:

  • 第一个参数就是要执行的函数的集合。
  • 第二个参数可以看成每个函数的执行(wrapAsync可以先忽略掉,直接看成这一个函数)。
  • 第三个参数就是所有函数执行完后的回调。

让我们来看看eachOfSeries是如何的实现:

var eachOfSeries = doLimit(eachOfLimit, 1);

function eachOfLimit(coll, limit, iteratee, callback) {
    _eachOfLimit(limit)(coll, wrapAsync(iteratee), callback);
}

function doLimit(fn, limit) {
    return function (iterable, iteratee, callback) {
        return fn(iterable, limit, iteratee, callback);
    };
}

我们把上面进行转换,这样看起来更明了些:

var eachOfSeries = function(iterable, iteratee, callback) {
    return _eachOfLimit(1)(iterable, wrapAsync(iteratee), callback);
};

Soga,最终就是调用_eachOfLimit完成的:

// limit:一次异步操作的最大数量,传1可以看成串行,一个函数执行完才进行下一个
function _eachOfLimit(limit) {
    return function (obj, iteratee, callback) {
        // once:函数只运行一次
        callback = once(callback || noop);
        if (limit <= 0 || !obj) {
            return callback(null);
        }
        // iterator:迭代器,有根据类型分类,这边简单拿数组迭代器createArrayIterator来分析
        var nextElem = iterator(obj);
        var done = false;
        var running = 0;
        var looping = false;

        function iterateeCallback(err, value) {
            running -= 1;
            if (err) {
                done = true;
                callback(err);
            }
            else if (value === breakLoop || (done && running <= 0)) {
                done = true;
                return callback(null);
            }
            else if (!looping) {
                replenish();
            }
        }

        function replenish () {
            looping = true;
            while (running < limit && !done) {
                var elem = nextElem();
                if (elem === null) {
                    done = true;
                    if (running <= 0) {
                        callback(null);
                    }
                    return;
                }
                running += 1;
                // onlyOnce:函数只运行一次
                iteratee(elem.value, elem.key, onlyOnce(iterateeCallback));
            }
            looping = false;
        }
        
        // 递归
        replenish();
    };
}

function once(fn) {
    return function() {
        if (fn === null) return;
        var callFn = fn;
        fn = null;
        callFn.apply(this, arguments);
    };
}

// 闭包大法,拿取集合中的函数
function createArrayIterator(coll) {
    var i = -1;
    var len = coll.length;
    return function next() {
        return ++i < len ? {value: coll[i], key: i} : null;
    }
}

终于,看到series的真身了。实现其实就是replenish()的递归大法。因为要实现串行,所以在replenish()中控制running数为1,取出集合中一个函数执行,然后回调iterateeCallback(),running数减1,再调用replenish(),这样就能控制每个函数在前一个函数完成后运行。

说起来这流程还是比较简单,但是在异步编程里还是不太好理解,我们先来了解一下js执行机制,再举一个例子来看:

js执行机制
  • 同步的进入主线程,异步的进入Event Table并注册函数。
  • 当指定的事情完成时,Event Table会将这个函数移入Event Queue。
  • 主线程内的任务执行完毕为空,会去Event Queue读取对应的函数,进入主线程执行。
  • 上述过程会不断重复,也就是常说的Event Loop(事件循环)。

普通版

function a() {
    setTimeout(function() {
        console.log(456);
    }, 500);
}

function b() {
    console.log(123);
}

function c() {
    setTimeout(function() {
        console.log(789);
    }, 0);
}

a();
b();
c();

// 123
// 789
// 456

按顺序执行可以看到

  • a()中setTimeout进入Event Table,注册回调函数。
  • b(),执行console.log(123)。
  • c()中setTimeout进入Event Table,注册回调函数。
  • c()中setTimeout先完成,回调函数进入Event Queue。
  • c()中setTimeout 500ms后完成,回调函数进入Event Queue。
  • 主线程从Event Queue读取回调函数并执行。

series版

const async = require('async');

async.series(
    [
        callback => {
            setTimeout(function() {
                console.log(456);
                callback(null, 1);
            }, 500);
        },
        callback => {
            console.log(123);
            callback(null, 2);
        },
        callback => {
            setTimeout(function() {
                console.log(789);
                callback(null, 3);
            }, 0);
        }
    ],
    function(err, results) {
        console.log(results);
    }
);

// 456
// 123
// 789
// [ 2, 1, 3 ]

按我自己的理解,主线程和Event Loop都执行完称为一轮:

  1. 第一轮

    • 按照上面流程,主线程走到_eachOfLimit(),调用replenish()。根据while循环(运行数running < 一次异步操作的最大数量 limit),running += 1,进入集合中第一个函setTimeout数的调用,setTimeout进入Event Table,注册回调函数。
    • 回到while循环,running=limit,结束循环,结束主线程。
    • setTimeout事件完成,回调函数进入Event Queue。
    • 主线程从Event Queue读取回调函数并执行,回调iterateeCallback,running -= 1,调用replenish()。
  2. 第二轮

    • 重复第一轮。只要的区别在于集合中的第二个函数是同步的,所有是主线程一路执行下来。
  3. 第三轮

    • 重复第一轮。
  4. 第四轮

    • 集合中的三个函数已经都执行完了,通过iterator()闭包拿到是null,回调最终结果。
wrapAsync
function wrapAsync(asyncFn) {
    return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
}

var supportsSymbol = typeof Symbol === 'function';

function isAsync(fn) {
    return supportsSymbol && fn[Symbol.toStringTag] === 'AsyncFunction';
}

wrapAsync()先判断是否异步函数,如果是es7 Async Functions的话调用asyncify,否则返回原函数。

function asyncify(func) {
    return initialParams(function (args, callback) {
        var result;
        try {
            result = func.apply(this, args);
        } catch (e) {
            return callback(e);
        }
        // if result is Promise object
        if (isObject(result) && typeof result.then === 'function') {
            result.then(function(value) {
                invokeCallback(callback, null, value);
            }, function(err) {
                invokeCallback(callback, err.message ? err : new Error(err));
            });
        } else {
            callback(null, result);
        }
    });
}

var initialParams = function (fn) {
    return function (/*...args, callback*/) {
        var args = slice(arguments);
        var callback = args.pop();
        fn.call(this, args, callback);
    };
};

采用同步功能并将其设置为异步,并将其返回值传递给回调函数。如果传递给asyncify的函数返回一个Promise,则该Promise的resolved/rejected状态将用于调用回调,而不仅仅是同步返回值。

总结

平日用惯async-await、promise,用起来简单,但也导致缺少思考。而尝试用原生js去模拟,阅读源码,却能带来更多的收获。

github地址,喜欢的支持star一下,Thanks♪(・ω・)ノ。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容

  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,385评论 8 265
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,101评论 1 32
  • 一、JavaScript单线程模型 JavaScript是单线程的,JavaScript只在一个线程上运行,但是浏...
    Brolly阅读 1,134评论 4 6
  • 文|JKCP 从小认识了周星星,他的无厘头给了我个方向,原来搞笑的人也很有魅力,初中认识了韩寒,他的文学影响了我,...
    JKCP阅读 386评论 0 6
  • 寒假开始了,乐宝最近两三天都在陪大姨奶家里的重外孙女玩。 对于孩子来说,不用上课,有小伙伴陪着玩耍,那是最喜欢的事...
    清清的海阅读 503评论 3 15